Support new login flow via Chrome

This commit is contained in:
DevilXD
2022-11-01 17:49:21 +01:00
parent 88b228d38b
commit 73a2a6145f
5 changed files with 370 additions and 307 deletions

View File

@@ -31,7 +31,12 @@ a = Analysis(
hookspath=[],
hooksconfig={},
noarchive=False,
hiddenimports=[],
hiddenimports=[
"setuptools._distutils.log",
"setuptools._distutils.dir_util",
"setuptools._distutils.file_util",
"setuptools._distutils.archive_util",
],
runtime_hooks=[],
cipher=block_cipher,
win_no_prefer_redirects=False,

204
gui.py
View File

@@ -1,18 +1,21 @@
from __future__ import annotations
import re
import sys
import json
import ctypes
import asyncio
import logging
import webbrowser
import tkinter as tk
from collections import abc
from math import log10, ceil
from collections import abc, namedtuple
from subprocess import CREATE_NO_WINDOW
from tkinter.font import Font, nametofont
from functools import partial, cached_property
from datetime import datetime, timedelta, timezone
from typing import Any, TypedDict, NoReturn, Generic, TYPE_CHECKING
from tkinter import Tk, ttk, StringVar, DoubleVar, IntVar, PhotoImage
from typing import Any, Union, Tuple, TypedDict, NoReturn, Generic, TYPE_CHECKING
import pystray
import win32api
@@ -20,13 +23,18 @@ import win32con
import win32gui
from yarl import URL
from PIL import Image as Image_module
from undetected_chromedriver import ChromeOptions
from seleniumwire.undetected_chromedriver.v2 import Chrome
from selenium.common.exceptions import WebDriverException, NoSuchWindowException
from translate import _
from cache import ImageCache
from exceptions import ExitRequest
from utils import resource_path, Game, _T
from registry import RegistryKey, ValueType
from constants import SELF_PATH, FORMATTER, WS_TOPICS_LIMIT, MAX_WEBSOCKETS, WINDOW_TITLE, State
from exceptions import MinerException, ExitRequest, LoginException
from constants import (
CLIENT_ID, SELF_PATH, FORMATTER, WS_TOPICS_LIMIT, MAX_WEBSOCKETS, WINDOW_TITLE, State
)
if TYPE_CHECKING:
from twitch import Twitch
@@ -35,9 +43,15 @@ if TYPE_CHECKING:
from inventory import DropsCampaign, TimedDrop
TK_PADDING = Union[int, Tuple[int, int], Tuple[int, int, int], Tuple[int, int, int, int]]
digits = ceil(log10(WS_TOPICS_LIMIT))
######################
# GUI ELEMENTS START #
######################
class _TKOutputHandler(logging.Handler):
def __init__(self, output: GUIManager):
super().__init__()
@@ -161,7 +175,7 @@ class PlaceholderCombobox(PlaceholderEntry, ttk.Combobox):
class PaddedListbox(tk.Listbox):
def __init__(self, master: ttk.Widget, *args, padding: tk._Padding = (0, 0, 0, 0), **kwargs):
def __init__(self, master: ttk.Widget, *args, padding: TK_PADDING = (0, 0, 0, 0), **kwargs):
# we place the listbox inside a frame with the same background
# this means we need to forward the 'grid' method to the frame, not the listbox
self._frame = tk.Frame(master)
@@ -210,7 +224,7 @@ class PaddedListbox(tk.Listbox):
self._frame.configure(frame_options)
# update padding
if "padding" in options:
padding: tk._Padding = options.pop("padding")
padding: TK_PADDING = options.pop("padding")
padx1: tk._ScreenUnits
padx2: tk._ScreenUnits
pady1: tk._ScreenUnits
@@ -221,9 +235,9 @@ class PaddedListbox(tk.Listbox):
padx1 = padx2 = pady1 = pady2 = padding
elif len(padding) == 2:
padx1 = padx2 = padding[0]
pady1 = pady2 = padding[1] # type: ignore
pady1 = pady2 = padding[1]
elif len(padding) == 3:
padx1, padx2 = padding[0:2] # type: ignore
padx1, padx2 = padding[0], padding[1]
pady1 = pady2 = padding[2] # type: ignore
else:
padx1, padx2, pady1, pady2 = padding # type: ignore
@@ -309,7 +323,9 @@ class SelectMenu(tk.Menubutton, Generic[_T]):
return self._menu_options[self.cget("text")]
# GUI ELEMENTS END / GUI DEFINITION START
###########################################
# GUI ELEMENTS END / GUI DEFINITION START #
###########################################
class StatusBar:
@@ -395,9 +411,6 @@ class WebsocketStatus:
self._topics_var.set('\n'.join(topic_lines))
LoginData = namedtuple("LoginData", ["username", "password", "token"])
class LoginForm:
def __init__(self, manager: GUIManager, master: ttk.Widget):
self._manager = manager
@@ -409,14 +422,14 @@ class LoginForm:
frame.rowconfigure(4, weight=1)
ttk.Label(frame, text=_("gui", "login", "labels")).grid(column=0, row=0)
ttk.Label(frame, textvariable=self._var, justify="center").grid(column=1, row=0)
self._login_entry = PlaceholderEntry(frame, placeholder=_("gui", "login", "username"))
self._login_entry.grid(column=0, row=1, columnspan=2)
self._pass_entry = PlaceholderEntry(
frame, placeholder=_("gui", "login", "password"), show=''
)
self._pass_entry.grid(column=0, row=2, columnspan=2)
self._token_entry = PlaceholderEntry(frame, placeholder=_("gui", "login", "twofa_code"))
self._token_entry.grid(column=0, row=3, columnspan=2)
# self._login_entry = PlaceholderEntry(frame, placeholder=_("gui", "login", "username"))
# self._login_entry.grid(column=0, row=1, columnspan=2)
# self._pass_entry = PlaceholderEntry(
# frame, placeholder=_("gui", "login", "password"), show='•'
# )
# self._pass_entry.grid(column=0, row=2, columnspan=2)
# self._token_entry = PlaceholderEntry(frame, placeholder=_("gui", "login", "twofa_code"))
# self._token_entry.grid(column=0, row=3, columnspan=2)
self._confirm = asyncio.Event()
self._button = ttk.Button(
frame, text=_("gui", "login", "button"), command=self._confirm.set, state="disabled"
@@ -424,31 +437,130 @@ class LoginForm:
self._button.grid(column=0, row=4, columnspan=2)
self.update(_("gui", "login", "logged_out"), None)
def clear(self, login: bool = False, password: bool = False, token: bool = False):
clear_all = not login and not password and not token
if login or clear_all:
self._login_entry.clear()
if password or clear_all:
self._pass_entry.clear()
if token or clear_all:
self._token_entry.clear()
@staticmethod
def interceptor(request) -> None:
if (
request.method == "POST"
and request.url == "https://passport.twitch.tv/protected_login"
):
body = request.body.decode("utf-8")
data = json.loads(body)
data["client_id"] = CLIENT_ID
request.body = json.dumps(data).encode("utf-8")
del request.headers["Content-Length"]
request.headers["Content-Length"] = str(len(request.body))
async def ask_login(self) -> LoginData:
async def ask_login(self) -> str:
self.update(_("gui", "login", "required"), None)
self._manager.print(_("gui", "login", "request"))
self._confirm.clear()
self._button.config(state="normal")
# NOTE: we need this to allow for the closing window event to break the waiting here
done, pending = await asyncio.wait(
[self._confirm.wait(), self._manager.wait_until_closed()],
return_when=asyncio.FIRST_COMPLETED,
)
for task in pending:
task.cancel()
if self._manager.close_requested:
raise ExitRequest()
self._button.config(state="disabled")
return LoginData(self._login_entry.get(), self._pass_entry.get(), self._token_entry.get())
# open the chrome browser on the Twitch's login page
# use a separate executor to void blocking the event loop
loop = asyncio.get_running_loop()
driver = None
try:
version_main = None
for attempt in range(2):
options = ChromeOptions()
options.add_argument("--log-level=3")
options.add_argument("--disable-web-security")
options.add_argument("--allow-running-insecure-content")
options.add_argument("--lang=en")
options.add_argument("--no-sandbox")
options.add_argument("--disable-gpu")
try:
driver_coro = loop.run_in_executor(
None,
lambda: Chrome(
options=options,
# use_subprocess=True,
suppress_welcome=True,
version_main=version_main,
service_creationflags=CREATE_NO_WINDOW,
)
)
done, pending = await asyncio.wait(
[driver_coro, self._manager.wait_until_closed()],
return_when=asyncio.FIRST_COMPLETED
)
for task in pending:
task.cancel()
if self._manager.close_requested:
raise ExitRequest()
driver = next(iter(done)).result()
break
except WebDriverException as exc:
message = exc.msg
if (
message is not None
and (match := re.search(
r'Chrome version ([\d]+)\nCurrent browser version is ((\d+)\.[\d.]+)',
message,
)) is not None
):
if not attempt:
version_main = int(match.group(3))
continue
else:
raise MinerException(
"Your Chrome browser is out of date\n"
f"Required version: {match.group(1)}\n"
f"Current version: {match.group(2)}"
) from None
raise MinerException(
"An error occured while boostrapping the Chrome browser"
) from exc
assert driver is not None
driver.request_interceptor = self.interceptor
page_coro = loop.run_in_executor(None, driver.get, "https://twitch.tv/login")
done, pending = await asyncio.wait(
[page_coro, self._manager.wait_until_closed()],
return_when=asyncio.FIRST_COMPLETED
)
for task in pending:
task.cancel()
if self._manager.close_requested:
raise ExitRequest()
# enable the login button once the page finishes opening
self._button.config(state="normal")
# auto login
# driver.find_element("id", "login-username").send_keys(username)
# driver.find_element("id", "password-input").send_keys(password)
# driver.find_element(
# "css selector", '[data-a-target="passport-login-button"]'
# ).click()
# token submit button css selectors
# Button: "screen="two_factor" target="submit_button"
# Input: <input type="text" autocomplete="one-time-code" data-a-target="tw-input"
# inputmode="numeric" pattern="[0-9]*" value="">
# wait for the user to navigate away from the URL, indicating successful login
# alternatively, they can press on the login button
while (
driver.current_url != "https://www.twitch.tv/?no-reload=true"
and not self._confirm.is_set()
):
if self._manager.close_requested:
raise ExitRequest()
await asyncio.sleep(0.5)
cookies = driver.get_cookies()
for cookie in cookies:
if cookie["name"] == "auth-token":
auth_token = cookie["value"]
break
else:
raise LoginException("Unable to extract authorization token")
except NoSuchWindowException:
driver = None
raise LoginException("Chrome window was closed, reopening...")
finally:
self._button.config(state="disabled")
if driver is not None:
driver.quit()
return auth_token
def update(self, status: str, user_id: int | None):
if user_id is not None:
@@ -1643,6 +1755,11 @@ class HelpTab:
).grid(sticky="nsew")
##########################################
# GUI DEFINITION END / GUI MANAGER START #
##########################################
class GUIManager:
def __init__(self, twitch: Twitch):
self._twitch: Twitch = twitch
@@ -1887,6 +2004,11 @@ class GUIManager:
self.output.print(*args, **kwargs)
###################
# GUI MANAGER END #
###################
if __name__ == "__main__":
# Everything below is for debug purposes only
import aiohttp

366
main.py
View File

@@ -1,198 +1,204 @@
from __future__ import annotations
import io
import sys
import signal
import asyncio
import logging
import argparse
import traceback
import tkinter as tk
from tkinter import messagebox
from typing import IO, NoReturn
# import an additional thing for proper PyInstaller freeze support
from multiprocessing import freeze_support
# pre-import 3rd party libraries, handling missing ones with an exception
try:
import aiohttp # noqa
except ModuleNotFoundError as exc:
raise ImportError("You have to run 'pip install aiohttp' first") from exc
try:
import pystray # noqa
except ModuleNotFoundError as exc:
raise ImportError("You have to run 'pip install pystray' first") from exc
try:
import PIL # noqa
except ModuleNotFoundError as exc:
raise ImportError("You have to run 'pip install pillow' first") from exc
try:
import win32gui # noqa
except ModuleNotFoundError as exc:
raise ImportError("You have to run 'pip install pywin32' first") from exc
if __name__ == "__main__":
freeze_support()
import io
import sys
import signal
import asyncio
import logging
import argparse
import traceback
import tkinter as tk
from tkinter import messagebox
from typing import IO, NoReturn
from translate import _
from twitch import Twitch
from settings import Settings
from version import __version__
from exceptions import CaptchaRequired
from constants import SELF_PATH, FORMATTER, LOG_PATH, WINDOW_TITLE
# pre-import 3rd party libraries, handling missing ones with an exception
try:
import aiohttp # noqa
except ModuleNotFoundError as exc:
raise ImportError("You have to run 'pip install aiohttp' first") from exc
try:
import pystray # noqa
except ModuleNotFoundError as exc:
raise ImportError("You have to run 'pip install pystray' first") from exc
try:
import PIL # noqa
except ModuleNotFoundError as exc:
raise ImportError("You have to run 'pip install pillow' first") from exc
try:
import win32gui # noqa
except ModuleNotFoundError as exc:
raise ImportError("You have to run 'pip install pywin32' first") from exc
from translate import _
from twitch import Twitch
from settings import Settings
from version import __version__
from exceptions import CaptchaRequired
from constants import SELF_PATH, FORMATTER, LOG_PATH, WINDOW_TITLE
class Parser(argparse.ArgumentParser):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._message: io.StringIO = io.StringIO()
class Parser(argparse.ArgumentParser):
def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self._message: io.StringIO = io.StringIO()
def _print_message(self, message: str, file: IO[str] | None = None) -> None:
self._message.write(message)
# print(message, file=self._message)
def _print_message(self, message: str, file: IO[str] | None = None) -> None:
self._message.write(message)
# print(message, file=self._message)
def exit(self, status: int = 0, message: str | None = None) -> NoReturn:
try:
super().exit(status, message) # sys.exit(2)
finally:
messagebox.showerror("Argument Parser Error", self._message.getvalue())
def exit(self, status: int = 0, message: str | None = None) -> NoReturn:
try:
super().exit(status, message) # sys.exit(2)
finally:
messagebox.showerror("Argument Parser Error", self._message.getvalue())
class ParsedArgs(argparse.Namespace):
_verbose: int
_debug_ws: bool
_debug_gql: bool
log: bool
tray: bool
no_run_check: bool
class ParsedArgs(argparse.Namespace):
_verbose: int
_debug_ws: bool
_debug_gql: bool
log: bool
tray: bool
no_run_check: bool
# TODO: replace int with union of literal values once typeshed updates
@property
def logging_level(self) -> int:
return {
0: logging.ERROR,
1: logging.WARNING,
2: logging.INFO,
3: logging.DEBUG,
}[min(self._verbose, 3)]
# TODO: replace int with union of literal values once typeshed updates
@property
def logging_level(self) -> int:
return {
0: logging.ERROR,
1: logging.WARNING,
2: logging.INFO,
3: logging.DEBUG,
}[min(self._verbose, 3)]
@property
def debug_ws(self) -> int:
"""
If the debug flag is True, return DEBUG.
If the main logging level is DEBUG, return INFO to avoid seeing raw messages.
Otherwise, return NOTSET to inherit the global logging level.
"""
if self._debug_ws:
return logging.DEBUG
elif self._verbose >= 3:
return logging.INFO
return logging.NOTSET
@property
def debug_ws(self) -> int:
"""
If the debug flag is True, return DEBUG.
If the main logging level is DEBUG, return INFO to avoid seeing raw messages.
Otherwise, return NOTSET to inherit the global logging level.
"""
if self._debug_ws:
return logging.DEBUG
elif self._verbose >= 3:
return logging.INFO
return logging.NOTSET
@property
def debug_gql(self) -> int:
if self._debug_gql:
return logging.DEBUG
elif self._verbose >= 3:
return logging.INFO
return logging.NOTSET
@property
def debug_gql(self) -> int:
if self._debug_gql:
return logging.DEBUG
elif self._verbose >= 3:
return logging.INFO
return logging.NOTSET
# handle input parameters
# NOTE: parser output is shown via message box
# we also need a dummy invisible window for the parser
root = tk.Tk()
root.overrideredirect(True)
root.withdraw()
root.update()
parser = Parser(
SELF_PATH.name,
description="A program that allows you to mine timed drops on Twitch.",
)
parser.add_argument("--version", action="version", version=f"v{__version__}")
parser.add_argument("-v", dest="_verbose", action="count", default=0)
parser.add_argument("--tray", action="store_true")
parser.add_argument("--log", action="store_true")
# undocumented debug args
parser.add_argument(
"--no-run-check", dest="no_run_check", action="store_true", help=argparse.SUPPRESS
)
parser.add_argument("--debug-ws", dest="_debug_ws", action="store_true", help=argparse.SUPPRESS)
parser.add_argument("--debug-gql", dest="_debug_gql", action="store_true", help=argparse.SUPPRESS)
args = parser.parse_args(namespace=ParsedArgs())
# load settings
try:
settings = Settings(args)
except Exception:
messagebox.showerror(
"Settings error",
f"There was an error while loading the settings file:\n\n{traceback.format_exc()}"
# handle input parameters
# NOTE: parser output is shown via message box
# we also need a dummy invisible window for the parser
root = tk.Tk()
root.overrideredirect(True)
root.withdraw()
root.update()
parser = Parser(
SELF_PATH.name,
description="A program that allows you to mine timed drops on Twitch.",
)
sys.exit(4)
# dummy window isn't needed anymore
root.destroy()
parser.add_argument("--version", action="version", version=f"v{__version__}")
parser.add_argument("-v", dest="_verbose", action="count", default=0)
parser.add_argument("--tray", action="store_true")
parser.add_argument("--log", action="store_true")
# undocumented debug args
parser.add_argument(
"--no-run-check", dest="no_run_check", action="store_true", help=argparse.SUPPRESS
)
parser.add_argument(
"--debug-ws", dest="_debug_ws", action="store_true", help=argparse.SUPPRESS
)
parser.add_argument(
"--debug-gql", dest="_debug_gql", action="store_true", help=argparse.SUPPRESS
)
args = parser.parse_args(namespace=ParsedArgs())
# load settings
try:
settings = Settings(args)
except Exception:
messagebox.showerror(
"Settings error",
f"There was an error while loading the settings file:\n\n{traceback.format_exc()}"
)
sys.exit(4)
# dummy window isn't needed anymore
root.destroy()
# check if we're not already running
try:
exists = win32gui.FindWindow(None, WINDOW_TITLE)
except AttributeError:
# we're not on Windows - continue
exists = False
if exists and not settings.no_run_check:
# already running - exit
sys.exit(3)
# check if we're not already running
try:
exists = win32gui.FindWindow(None, WINDOW_TITLE)
except AttributeError:
# we're not on Windows - continue
exists = False
if exists and not settings.no_run_check:
# already running - exit
sys.exit(3)
# set language
try:
_.set_language(settings.language)
except ValueError:
# this language doesn't exist - stick to English
pass
# set language
try:
_.set_language(settings.language)
except ValueError:
# this language doesn't exist - stick to English
pass
# handle logging stuff
if settings.logging_level > logging.DEBUG:
# redirect the root logger into a NullHandler, effectively ignoring all logging calls
# that aren't ours. This always runs, unless the main logging level is DEBUG or lower.
logging.getLogger().addHandler(logging.NullHandler())
logger = logging.getLogger("TwitchDrops")
logger.setLevel(settings.logging_level)
if settings.log:
handler = logging.FileHandler(LOG_PATH)
handler.setFormatter(FORMATTER)
logger.addHandler(handler)
logging.getLogger("TwitchDrops.gql").setLevel(settings.debug_gql)
logging.getLogger("TwitchDrops.websocket").setLevel(settings.debug_ws)
# handle logging stuff
if settings.logging_level > logging.DEBUG:
# redirect the root logger into a NullHandler, effectively ignoring all logging calls
# that aren't ours. This always runs, unless the main logging level is DEBUG or lower.
logging.getLogger().addHandler(logging.NullHandler())
logger = logging.getLogger("TwitchDrops")
logger.setLevel(settings.logging_level)
if settings.log:
handler = logging.FileHandler(LOG_PATH)
handler.setFormatter(FORMATTER)
logger.addHandler(handler)
logging.getLogger("TwitchDrops.gql").setLevel(settings.debug_gql)
logging.getLogger("TwitchDrops.websocket").setLevel(settings.debug_ws)
# client run
exit_status = 0
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = Twitch(settings)
signal.signal(signal.SIGINT, lambda *_: client.gui.close())
signal.signal(signal.SIGTERM, lambda *_: client.gui.close())
try:
loop.run_until_complete(client.run())
except CaptchaRequired:
exit_status = 1
msg = _("error", "captcha")
client.prevent_close()
client.print(msg)
except Exception:
exit_status = 1
msg = "Fatal error encountered:\n"
client.prevent_close()
client.print(msg)
client.print(traceback.format_exc())
finally:
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
client.print(_("gui", "status", "exiting"))
loop.run_until_complete(client.shutdown())
if not client.gui.close_requested:
client.print(_("status", "terminated"))
client.gui.status.update(_("gui", "status", "terminated"))
loop.run_until_complete(client.gui.wait_until_closed())
# save the application state
# NOTE: we have to do it after wait_until_closed,
# because the user can alter some settings between app termination and closing the window
client.save(force=True)
client.gui.stop()
client.gui.close_window()
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
sys.exit(exit_status)
# client run
exit_status = 0
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
client = Twitch(settings)
signal.signal(signal.SIGINT, lambda *_: client.gui.close())
signal.signal(signal.SIGTERM, lambda *_: client.gui.close())
try:
loop.run_until_complete(client.run())
except CaptchaRequired:
exit_status = 1
msg = _("error", "captcha")
client.prevent_close()
client.print(msg)
except Exception:
exit_status = 1
msg = "Fatal error encountered:\n"
client.prevent_close()
client.print(msg)
client.print(traceback.format_exc())
finally:
signal.signal(signal.SIGINT, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
client.print(_("gui", "status", "exiting"))
loop.run_until_complete(client.shutdown())
if not client.gui.close_requested:
client.print(_("status", "terminated"))
client.gui.status.update(_("gui", "status", "terminated"))
loop.run_until_complete(client.gui.wait_until_closed())
# save the application state
# NOTE: we have to do it after wait_until_closed,
# because the user can alter some settings between app termination and closing the window
client.save(force=True)
client.gui.stop()
client.gui.close_window()
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
sys.exit(exit_status)

View File

@@ -2,3 +2,6 @@ aiohttp>2.0,<4.0
Pillow
pystray
pywin32
selenium-wire
# use a fork that has an extra option of hiding the unneeded Chrome's console cmd window
git+https://github.com/sebdelsol/undetected-chromedriver

View File

@@ -22,9 +22,7 @@ from gui import GUIManager
from channel import Channel
from websocket import WebsocketPool
from inventory import DropsCampaign
from exceptions import (
MinerException, LoginException, CaptchaRequired, ExitRequest, ReloadRequest, RequestInvalid
)
from exceptions import MinerException, ExitRequest, LoginException, ReloadRequest, RequestInvalid
from utils import (
CHARS_HEX_LOWER,
timestamp,
@@ -103,89 +101,18 @@ class _AuthState:
async def _login(self) -> str:
logger.debug("Login flow started")
login_form: LoginForm = self._twitch.gui.login
gui_print = self._twitch.gui.print
payload: JsonType = {
"client_id": CLIENT_ID,
"undelete_user": False,
"remember_me": True,
# 'force_twitchguard': False,
}
while True:
username, password, token = await login_form.ask_login()
payload["username"] = username
payload["password"] = password
# remove stale 2FA tokens, if present
payload.pop("authy_token", None)
payload.pop("twitchguard_code", None)
for attempt in range(2):
async with self._twitch.request(
"POST", "https://passport.twitch.tv/login", json=payload
) as response:
login_response: JsonType = await response.json()
# Feed this back in to avoid running into CAPTCHA if possible
if "captcha_proof" in login_response:
payload["captcha"] = {"proof": login_response["captcha_proof"]}
# Error handling
if "error_code" in login_response:
error_code: int = login_response["error_code"]
logger.debug(f"Login error code: {error_code}")
if error_code in (1000, 5027):
# we've failed bois
session = await self._twitch.get_session()
jar = cast(aiohttp.CookieJar, session.cookie_jar)
assert BASE_URL.host is not None
jar.clear_domain(BASE_URL.host)
logger.debug("Login failed due to CAPTCHA")
raise CaptchaRequired()
elif error_code == 3001:
# wrong password you dummy
logger.debug("Login failed due to incorrect username or password")
gui_print(_("login", "incorrect_login_pass"))
login_form.clear(password=True)
break
elif error_code in (
3012, # Invalid authy token
3023, # Invalid email code
):
logger.debug("Login failed due to incorrect 2FA code")
if error_code == 3023:
gui_print(_("login", "incorrect_email_code"))
else:
gui_print(_("login", "incorrect_twofa_code"))
login_form.clear(token=True)
break
elif error_code in (
3011, # Authy token needed
3022, # Email code needed
):
# 2FA handling
logger.debug("2FA token required")
email = error_code == 3022
if not token:
# user didn't provide a token, so ask them for it
if email:
gui_print(_("login", "email_code_required"))
else:
gui_print(_("login", "twofa_code_required"))
break
if email:
payload["twitchguard_code"] = token
else:
payload["authy_token"] = token
continue
else:
raise LoginException(login_response["error"])
# Success handling
if "access_token" in login_response:
# we're in bois
self.access_token = cast(str, login_response["access_token"])
logger.debug("Access token granted")
login_form.clear()
return self.access_token
for attempt in range(1, 6): # 5 attempts
try:
self.access_token = await login_form.ask_login()
break
except LoginException as exc:
if attempt < 5:
self._twitch.gui.print(exc.args[0])
continue
raise # reraise on and past the 5th attempt
logger.debug("Access token granted")
return self.access_token
def gql_headers(self, *, integrity: bool) -> JsonType:
headers = {