First release

This commit is contained in:
DevilXD
2021-12-03 17:48:51 +01:00
commit fac2788e4b
12 changed files with 1164 additions and 0 deletions

7
.gitignore vendored Normal file
View File

@@ -0,0 +1,7 @@
__pycache__
.mypy_cache
/build
/dist
/cookies.pickle
/settings.json
/*.spec

23
.vscode/launch.json vendored Normal file
View File

@@ -0,0 +1,23 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Run Current File",
"type": "python",
"request": "launch",
"program": "${file}",
"console": "integratedTerminal"
},
{
"name": "Run App",
"type": "python",
"request": "launch",
"program": "main.py",
"console": "integratedTerminal",
"justMyCode": false
},
]
}

20
.vscode/tasks.json vendored Normal file
View File

@@ -0,0 +1,20 @@
{
// See https://go.microsoft.com/fwlink/?LinkId=733558
// for the documentation about the tasks.json format
"version": "2.0.0",
"tasks": [
{
"label": "Build App",
"type": "shell",
"command": "build.bat",
"problemMatcher": [],
"group": {
"kind": "build",
"isDefault": true
},
"presentation": {
"showReuseMessage": false
}
}
]
}

1
build.bat Normal file
View File

@@ -0,0 +1 @@
pyinstaller -F -n "Twitch Drops Miner (by DevilXD)" -i pickaxe.ico main.py

136
channel.py Normal file
View File

@@ -0,0 +1,136 @@
from __future__ import annotations
import re
import json
import logging
from copy import copy
from base64 import b64encode
from datetime import datetime, timezone
from typing import Any, Optional, Dict, TYPE_CHECKING
from inventory import Game
from exceptions import MinerException
from constants import BASE_URL, GQL_OPERATIONS
if TYPE_CHECKING:
from twitch import Twitch
logger = logging.getLogger("TwitchDrops")
class Stream:
def __init__(self, channel: Channel, data: Dict[str, Any]):
self._twitch = channel._twitch
self.channel = channel
stream = data["stream"]
self.broadcast_id = int(stream["id"])
self.viewer_count = stream["viewersCount"]
self.drops_enabled = any(tag["localizedName"] == "Drops Enabled" for tag in stream["tags"])
settings = data["broadcastSettings"]
self.game: Game = Game(settings["game"])
self.title = settings["title"]
self._timestamp = datetime.now(timezone.utc)
class Channel:
async def __new__(cls, *args, **kwargs):
"""
Enables __init__ to be async.
The instance is returned after initialization completes.
"""
self = super().__new__(cls)
await self.__init__(*args, **kwargs)
return self
async def __init__(self, twitch: Twitch, channel_name: str): # type: ignore
self._twitch: Twitch = twitch
self.id: int = 0 # temp, to be filled by get_stream
self.name: str = channel_name
self.url: str = f"{BASE_URL}/{channel_name}"
self._spade_url: str = await self.get_spade_url()
self.stream: Optional[Stream] = None
await self.get_stream()
@property
def online(self) -> bool:
"""
Returns True if the streamer is online and is currently streaming, False otherwise.
"""
return self.stream is not None
async def get_spade_url(self) -> str:
"""
To get this monstrous thing, you have to walk a chain of requests.
Streamer page (HTML) --parse-> Streamer Settings (JavaScript) --parse-> Spade URL
"""
async with self._twitch._session.get(self.url) as response:
streamer_html = await response.text(encoding="utf8")
match = re.search(
r'src="(https://static\.twitchcdn\.net/config/settings\.[0-9a-f]{32}\.js)"',
streamer_html,
re.I,
)
if not match:
raise MinerException("Error while spade_url extraction: step #1")
streamer_settings = match.group(1)
async with self._twitch._session.get(streamer_settings) as response:
settings_js = await response.text(encoding="utf8")
match = re.search(
r'"spade_url": ?"(https://video-edge-[.\w\-/]+\.ts)"', settings_js, re.I
)
if not match:
raise MinerException("Error while spade_url extraction: step #2")
return match.group(1)
async def get_stream(self) -> Optional[Stream]:
op = copy(GQL_OPERATIONS["GetStreamInfo"].with_variables({"channel": self.name}))
response = await self._twitch.gql_request(op)
if response:
stream_data = response["data"]["user"]
self.id = int(stream_data["id"]) # fill channel_id
if stream_data["stream"]:
self.stream = Stream(self, stream_data)
else:
self.stream = None
return self.stream
async def check_online(self) -> bool:
stream = await self.get_stream()
if stream is None:
return False
return True
def set_offline(self):
# to be called externally, if we receive an event about this happening
self.stream = None
def _encode_payload(self):
assert self.stream is not None
assert self._twitch._user_id is not None
payload = [
{
"event": "minute-watched",
"properties": {
"channel_id": self.id,
"broadcast_id": self.stream.broadcast_id,
"player": "site",
"user_id": self._twitch._user_id,
}
}
]
json_event = json.dumps(payload, separators=(",", ":"))
return {"data": (b64encode(json_event.encode("utf8"))).decode("utf8")}
async def _send_watch(self):
"""
This uses the encoded payload on spade url to simulate watching the stream.
Optimally, send every 60 seconds to advance drops.
"""
if not self.online:
return
logger.debug(f"Sending minute-watched to {self.name}")
async with self._twitch._session.post(
self._spade_url, data=self._encode_payload()
) as response:
return response.status == 204

138
constants.py Normal file
View File

@@ -0,0 +1,138 @@
from __future__ import annotations
from copy import copy
from datetime import timedelta
from typing import Any, Optional, Union, Dict, Callable
BASE_URL = "https://twitch.tv"
WEBSOCKET_URL = "wss://pubsub-edge.twitch.tv/v1"
GQL_URL = "https://gql.twitch.tv/gql"
CLIENT_ID = "kimne78kx3ncx6brgo4mv6wki5h1ko"
USER_AGENT = "Twitch Drops App"
SETTINGS_PATH = "settings.json"
COOKIES_PATH = "cookies.pickle"
PING_INTERVAL = timedelta(minutes=3)
class GQLOperation(Dict[str, Any]):
def __init__(self, name: str, sha256: str, *, variables: Optional[Dict[str, Any]] = None):
super().__init__(
operationName=name,
extensions={
"persistedQuery": {
"version": 1,
"sha256Hash": sha256,
}
}
)
if variables is not None:
super().__setitem__("variables", variables)
def with_variables(self, variables: Dict[str, Any]):
modified = copy(self)
modified["variables"] = variables
return modified
GQL_OPERATIONS: Dict[str, GQLOperation] = {
"IsStreamLive": GQLOperation(
"WithIsStreamLiveQuery",
"04e46329a6786ff3a81c01c50bfa5d725902507a0deb83b0edbf7abe7a3716ea",
),
"GetStreamInfo": GQLOperation( # used
"VideoPlayerStreamInfoOverlayChannel",
"a5f2e34d626a9f4f5c0204f910bab2194948a9502089be558bb6e779a9e1b3d2",
),
"ClaimCommunityPoints": GQLOperation( # used
"ClaimCommunityPoints",
"46aaeebe02c99afdf4fc97c7c0cba964124bf6b0af229395f1f6d1feed05b3d0",
),
"ClaimDrop": GQLOperation( # used
"DropsPage_ClaimDropRewards",
"2f884fa187b8fadb2a49db0adc033e636f7b6aaee6e76de1e2bba9a7baf0daf6",
),
"ChannelPointsContext": GQLOperation( # used
"ChannelPointsContext",
"9988086babc615a918a1e9a722ff41d98847acac822645209ac7379eecb27152",
),
"ModViewChannelQuery": GQLOperation(
"ModViewChannelQuery",
"df5d55b6401389afb12d3017c9b2cf1237164220c8ef4ed754eae8188068a807",
),
"Inventory": GQLOperation( # used
"Inventory",
"e0765ebaa8e8eeb4043cc6dfeab3eac7f682ef5f724b81367e6e55c7aef2be4c",
),
"ViewerDropsDashboard": GQLOperation(
"ViewerDropsDashboard",
"c4d61d7b71d03b324914d3cf8ca0bc23fe25dacf54120cc954321b9704a3f4e2",
),
"DropCampaignDetails": GQLOperation(
"DropCampaignDetails",
"14b5e8a50777165cfc3971e1d93b4758613fe1c817d5542c398dce70b7a45c05",
),
"AvailableDrops": GQLOperation(
"DropsHighlightService_AvailableDrops",
"b19ee96a0e79e3f8281c4108bc4c7b3f232266db6f96fd04a339ab393673a075",
),
# Use for replace https://api.twitch.tv/helix/users?login={self.username}
"ReportMenuItem": GQLOperation(
"ReportMenuItem",
"8f3628981255345ca5e5453dfd844efffb01d6413a9931498836e6268692a30c",
),
"PersonalSections": GQLOperation(
"PersonalSections",
"9fbdfb00156f754c26bde81eb47436dee146655c92682328457037da1a48ed39",
variables={
"input": {
"sectionInputs": ["FOLLOWED_SECTION"],
"recommendationContext": {"platform": "web"},
},
"channelLogin": None,
"withChannelUser": False,
"creatorAnniversariesExperimentEnabled": False,
},
),
}
def get_topic(
topic_name: str, target_id: Union[str, int], process: Callable[[Dict[str, Any]], Any]
) -> WebsocketTopic:
return WebsocketTopic(f"{WEBSOCKET_TOPICS[topic_name]}.{target_id}", process)
class WebsocketTopic:
def __init__(self, topic_id: str, process: Callable[[Dict[str, Any]], Any]):
self.id: str = topic_id
self.process: Callable[[Dict[str, Any]], Any] = process
def __str__(self) -> str:
return self.id
def __eq__(self, other):
if isinstance(other, str):
return self.id == other
elif isinstance(other, WebsocketTopic):
return self.id == other.id
return NotImplemented
def __hash__(self) -> int:
return hash(self.id)
WEBSOCKET_TOPICS: Dict[str, str] = {
# Using user_id
"UserDrops": "user-drop-events",
"UserStreamState": "stream-change-v1",
"UserCommunityPoints": "community-points-user-v1",
"Presence": "presence",
"Notifications": "onsite-notifications",
# Using channel_id
"ChannelDrops": "channel-drop-events",
"ChannelStreamState": "stream-change-by-channel",
"ChannelCommunityPoints": "community-points-channel-v1",
"VideoPlayback": "video-playback-by-id",
"StreamUpdate": "broadcast-settings-update",
}

32
exceptions.py Normal file
View File

@@ -0,0 +1,32 @@
class MinerException(Exception):
def __init__(self, *args: object):
if args:
super().__init__(*args)
else:
super().__init__("Unknown miner error")
class RequestException(MinerException):
def __init__(self, *args: object):
if args:
super().__init__(*args)
else:
super().__init__("Unknown error during request")
class LoginException(RequestException):
def __init__(self, *args: object):
if args:
super().__init__(*args)
else:
super().__init__("Unknown error during login")
class CaptchaRequired(LoginException):
def __init__(self):
super().__init__("Captcha is required")
class IncorrectCredentials(LoginException):
def __init__(self):
super().__init__("Incorrect username or password")

128
inventory.py Normal file
View File

@@ -0,0 +1,128 @@
from __future__ import annotations
from datetime import datetime
from typing import Any, Optional, List, Dict, TYPE_CHECKING
from constants import GQL_OPERATIONS
if TYPE_CHECKING:
from twitch import Twitch
async def claim_drop(twitch: Twitch, claim_id: str) -> bool:
op = GQL_OPERATIONS["ClaimDrop"].with_variables(
{"input": {"dropInstanceID": claim_id}}
)
response = await twitch.gql_request(op)
data = response["data"]
if "errors" in data and data["errors"]:
return False
elif "claimDropRewards" in data:
if not data["claimDropRewards"]:
return False
elif (
data["claimDropRewards"]["status"]
in ["ELIGIBLE_FOR_ALL", "DROP_INSTANCE_ALREADY_CLAIMED"]
):
return True
return False
class Game:
def __init__(self, data: Dict[str, Any]):
self.id: int = int(data["id"])
self.name: str = data["name"]
def __eq__(self, other: object):
if isinstance(other, self.__class__):
return self.id == other.id
return NotImplemented
def __hash__(self) -> int:
return self.id
class BaseDrop:
def __init__(self, campaign: DropsCampaign, data: Dict[str, Any]):
self._twitch: Twitch = campaign._twitch
self.id = data["id"]
self.name: str = data["name"]
self.campaign: DropsCampaign = campaign
self.rewards: List[str] = [b["benefit"]["name"] for b in data["benefitEdges"]]
self.starts_at: datetime = datetime.strptime(data["startAt"], "%Y-%m-%dT%H:%M:%SZ")
self.ends_at: datetime = datetime.strptime(data["endAt"], "%Y-%m-%dT%H:%M:%SZ")
# If claim_id is not None, we can use it to claim the drop
self.claim_id: Optional[str] = data["self"]["dropInstanceID"]
self.is_claimed: bool = data["self"]["isClaimed"]
self._preconditions: bool = data["self"]["hasPreconditionsMet"]
@property
def can_earn(self) -> bool:
return (
self._preconditions # preconditions are met
and self.campaign.active # campaign is active
and not self.is_claimed # drop isn't already claimed
)
@property
def can_claim(self) -> bool:
return self.claim_id is not None and not self.is_claimed
async def claim(self) -> bool:
"""
Returns True if the claim succeeded, False otherwise.
"""
if not self.can_claim:
return False
assert self.claim_id is not None
self.is_claimed = await claim_drop(self._twitch, self.claim_id)
return self.is_claimed
class TimedDrop(BaseDrop):
def __init__(self, campaign: DropsCampaign, data: Dict[str, Any]):
super().__init__(campaign, data)
self.current_minutes: int = data["self"]["currentMinutesWatched"]
self.required_minutes: int = data["requiredMinutesWatched"]
if self.is_claimed:
# correct minutes for claimed drops
self.current_minutes = self.required_minutes
@property
def remaining_minutes(self):
return self.required_minutes - self.current_minutes
@property
def progress(self):
return self.current_minutes / self.required_minutes
def update(self, message: Dict[str, Any]):
# {"type": "drop-progress", data: {"current_progress_min": 3, "required_progress_min": 10}}
# {"type": "drop-claim", data: {"drop_instance_id": ...}}
msg_type = message["type"]
if msg_type == "drop-progress":
self.current_minutes = message["data"]["current_progress_min"]
self.required_minutes = message["data"]["required_progress_min"]
elif msg_type == "drop-claim":
self.claim_id = message["data"]["drop_instance_id"]
class DropsCampaign:
def __init__(self, twitch: Twitch, data: Dict[str, Any]):
self._twitch: Twitch = twitch
self.id: str = data["id"]
self.name: str = data["name"]
self.game: Game = Game(data["game"])
self.starts_at: datetime = datetime.strptime(data["startAt"], "%Y-%m-%dT%H:%M:%SZ")
self.ends_at: datetime = datetime.strptime(data["endAt"], "%Y-%m-%dT%H:%M:%SZ")
self.status: str = data["status"]
self.timed_drops: Dict[str, TimedDrop] = {
d["id"]: TimedDrop(self, d) for d in data["timeBasedDrops"]
}
@property
def active(self):
return self.status == "ACTIVE"
def get_drop(self, drop_id: str) -> Optional[TimedDrop]:
return self.timed_drops.get(drop_id)

123
main.py Normal file
View File

@@ -0,0 +1,123 @@
from __future__ import annotations
__version__ = 1
import os
import sys
import json
import ctypes
import logging
import asyncio
import traceback
import threading
from typing import Any, Dict, NoReturn
from twitch import Twitch
from constants import SETTINGS_PATH
try:
import win32api
from win32con import CTRL_CLOSE_EVENT, CTRL_LOGOFF_EVENT, CTRL_SHUTDOWN_EVENT
except (ImportError, ModuleNotFoundError):
raise ImportError("You have to run 'python -m pip install pywin32' first")
# nice console title
try:
ctypes.windll.kernel32.SetConsoleTitleW(f"Twitch Drops Miner v{__version__} (by DevilXD)")
except AttributeError:
# ensure we're on windows and there was no import problems
print("Only Windows supported!")
quit()
assert sys.platform == "win32"
def terminate() -> NoReturn:
forever = threading.Event()
print("\nApplication Terminated.\nClose the console window to exit the application.")
forever.wait()
raise RuntimeError("Uh oh") # this will never run, solely for MyPy
# handle extra stackable '-v' parameter, that switches the logging level
logging_level = logging.ERROR
if len(sys.argv) > 1:
arg = sys.argv[1]
if arg == ("-v", "-v1"):
logging_level = logging.WARNING
elif arg in ("-vv", "-v2"):
logging_level = logging.INFO
elif arg in ("-vvv", "-v3"):
logging_level = logging.DEBUG
# handle logging stuff
logger = logging.getLogger("TwitchDrops")
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(logging.Formatter("{levelname}: {message}", style='{'))
logger.addHandler(handler)
logger.setLevel(logging_level)
# handle settings
if not os.path.isfile(SETTINGS_PATH):
default = {
"username": "YourTwitchUsername",
"channels": ["Channel1", "Channel2", "Channel3"],
}
with open(SETTINGS_PATH, 'w', encoding="utf8") as file:
json.dump(default, file, indent=4)
print(
f"File '{SETTINGS_PATH}' created.\n"
"Please modify default settings as necessary, then relaunch the application."
)
terminate()
with open(SETTINGS_PATH, 'r', encoding="utf8") as file:
settings: Dict[str, Any] = json.load(file)
required_fields = ["channels"]
for field_name in required_fields:
if field_name not in settings:
print(f"Field '{field_name}' is a required field in '{SETTINGS_PATH}'")
terminate()
# asyncio loop
loop = asyncio.get_event_loop()
# client init
client = Twitch(settings.get("username"), settings.get("password"))
# main task and it's close event
main_task = loop.create_task(client.run(settings["channels"]))
close_event = threading.Event()
def clean_exit(code: int):
if code not in (CTRL_CLOSE_EVENT, CTRL_LOGOFF_EVENT, CTRL_SHUTDOWN_EVENT):
# filter only events we want
return False
# cancel the main task - this triggers the cleanup
main_task.cancel()
# wait until cleanup completes
close_event.wait()
# tell OS that we're free to exit now
return True
# ensures clean exit upon closing the console
win32api.SetConsoleCtrlHandler(clean_exit, True)
try:
loop.run_until_complete(main_task)
except (asyncio.CancelledError, KeyboardInterrupt):
# KeyboardInterrupt causes run_until_complete to exit, but without cancelling the task.
# The loop stops and thus the task gets frozen, until the loop runs again.
# Because we don't want anything from there to actually run during cleanup,
# we need to explicitly cancel the task ourselves here.
main_task.cancel()
# cancel all other tasks
for task in asyncio.all_tasks(loop):
if not task.done():
task.cancel()
# main_task was cancelled due to program shutting down - do the cleanup
loop.run_until_complete(client.close())
# notify we're free to exit
close_event.set()
except Exception:
print("Fatal error encountered:\n")
traceback.print_exc()
terminate()
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()

BIN
pickaxe.ico Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 66 KiB

340
twitch.py Normal file
View File

@@ -0,0 +1,340 @@
from __future__ import annotations
import os
import asyncio
import logging
from yarl import URL
from getpass import getpass
from functools import partial
from typing import Any, Optional, Union, List, Dict, cast
try:
import aiohttp
except ImportError:
raise ImportError("You have to run 'python -m pip install aiohttp' first")
from channel import Channel
from websocket import Websocket
from inventory import DropsCampaign
from exceptions import LoginException, CaptchaRequired, IncorrectCredentials
from constants import (
CLIENT_ID,
USER_AGENT,
COOKIES_PATH,
GQL_URL,
GQL_OPERATIONS,
GQLOperation,
WebsocketTopic,
get_topic,
)
logger = logging.getLogger("TwitchDrops")
class Twitch:
def __init__(
self,
username: Optional[str] = None,
password: Optional[str] = None,
*,
loop: Optional[asyncio.AbstractEventLoop] = None,
):
self.username: Optional[str] = username
self.password: Optional[str] = password
# Cookies, session and auth
cookie_jar = aiohttp.CookieJar()
if os.path.isfile(COOKIES_PATH):
cookie_jar.load(COOKIES_PATH)
self._session = aiohttp.ClientSession(
cookie_jar=cookie_jar, headers={"User-Agent": USER_AGENT}, loop=loop
)
self._access_token: Optional[str] = None
self._user_id: Optional[int] = None
self._is_logged_in = asyncio.Event()
# Websocket
self.websocket = Websocket(self)
# Storing, watching and changing channels
self.channels: Dict[int, Channel] = {}
self._watching_channel: Optional[Channel] = None
self._watching_task: Optional[asyncio.Task[Any]] = None
self._channel_change = asyncio.Event()
# Inventory
self.inventory: List[DropsCampaign] = []
def wait_until_login(self):
return self._is_logged_in.wait()
async def close(self):
print("Exiting...")
self._session.cookie_jar.save(COOKIES_PATH) # type: ignore
self.stop_watching()
await self._session.close()
await self.websocket.close()
await asyncio.sleep(1) # allows aiohttp to safely close the session
@property
def currently_watching(self) -> Optional[Channel]:
return self._watching_channel
async def run(self, channels: List[str] = []):
"""
Main method that runs the whole client.
Here, we manage several things, specifically:
• Fetching the drops inventory to make sure that everything we can claim, is claimed
• Selecting a stream to watch, and watching it
• Changing the stream that's being watched if necessary
"""
# Start our websocket connection - shouldn't require task tracking
asyncio.create_task(self.websocket.connect())
# Claim the drops we can
self.inventory = await self.get_inventory()
games = set()
for campaign in self.inventory:
if campaign.status == "UPCOMING":
# we have no use in processing upcoming campaigns here
continue
for drop in campaign.timed_drops.values():
if drop.can_earn:
games.add(campaign.game)
if drop.can_claim:
await drop.claim()
# Fetch information about all channels we're supposed to handle
for channel_name in channels:
channel: Channel = await Channel(self, channel_name) # type: ignore
self.channels[channel.id] = channel
# Sub to these channel updates
topics: List[WebsocketTopic] = []
for channel_id in self.channels:
topics.append(
get_topic(
"VideoPlayback", channel_id, partial(self.process_stream_state, channel_id)
)
)
await self.websocket.add_topics(topics)
# Repeat: Change into a channel we can watch, then reset the flag
self._channel_change.set()
refresh_channels = False # we're entering having fresh channel data already
while True:
# wait for the change channel signal
await self._channel_change.wait()
for channel in self.channels.values():
if (
channel.stream is not None # steam online
and channel.stream.drops_enabled # drops are enabled
and channel.stream.game in games # streams a game we can earn drops in
):
self.watch(channel)
refresh_channels = True
self._channel_change.clear()
break
else:
# there's no available channel to watch
if refresh_channels:
# refresh the status of all channels,
# to make sure that our websocket didn't miss anything til this point
print("No suitable channel to watch, refreshing...")
for channel in self.channels.values():
await channel.get_stream()
await asyncio.sleep(0.5)
refresh_channels = False
continue
print("No suitable channel to watch, retrying in 120 seconds")
await asyncio.sleep(120)
def watch(self, channel: Channel):
if self._watching_task is not None:
self._watching_task.cancel()
async def watcher(channel: Channel):
op = GQL_OPERATIONS["ChannelPointsContext"].with_variables(
{"channelLogin": channel.name}
)
i = 0
while True:
await channel._send_watch()
if i == 0:
# ensure every 30 minutes that we don't have unclaimed points bonus
response = await self.gql_request(op)
channel_data: Dict[str, Any] = response["data"]["community"]["channel"]
claim_available: Dict[str, Any] = (
channel_data["self"]["communityPoints"]["availableClaim"]
)
if claim_available:
await self.claim_points(channel_data["id"], claim_available["id"])
logger.info("Claimed bonus points")
i = (i + 1) % 30
await asyncio.sleep(59)
print(f"Watching: {channel.name}")
self._watching_channel = channel
self._watching_task = asyncio.create_task(watcher(channel))
def stop_watching(self):
if self._watching_task is not None:
logger.warning("Watching stopped.")
self._watching_task.cancel()
self._watching_task = None
self._watching_channel = None
async def process_stream_state(self, channel_id: int, message: Dict[str, Any]):
msg_type = message["type"]
channel = self.channels.get(channel_id)
if channel is None:
logger.error(f"Stream state change for a non-existing channel: {channel_id}")
return
if msg_type == "stream-down":
logger.info(f"{channel.name} goes OFFLINE")
channel.set_offline()
if self._watching_channel is not None and self._watching_channel.id == channel_id:
# change the channel if we're currently watching it
self._channel_change.set()
elif msg_type == "stream-up":
logger.info(f"{channel.name} goes ONLINE")
# stream_up is sent before the stream actually goes online, so just wait a bit
# and check if it's actually online by then
async def online_delay(channel: Channel):
await asyncio.sleep(10)
await channel.check_online()
asyncio.create_task(online_delay(channel))
elif msg_type == "viewcount":
if channel.stream is None:
# check if we've got a view count for a stream that just started
await channel.check_online()
if channel.stream is not None:
viewers = message["viewers"]
channel.stream.viewer_count = viewers
logger.info(f"{channel.name} viewers: {viewers}")
else:
logger.error(f"Channel viewcount update for an offline stream: {channel.name}")
async def _login(self) -> str:
logger.debug("Login flow started")
if self.username is None:
self.username = input("Username: ")
if self.password is None:
self.password = getpass()
if not self.password:
# catch early empty pass
raise IncorrectCredentials()
payload: Dict[str, Any] = {
"username": self.username,
"password": self.password,
"client_id": CLIENT_ID,
"undelete_user": False,
"remember_me": True,
}
for attempt in range(10):
async with self._session.post(
"https://passport.twitch.tv/login", json=payload
) as response:
login_response = await response.json()
# Feed this back in to avoid running into CAPTCHA if possible
if "captcha_proof" in login_response:
payload["captcha"] = {"proof": login_response["captcha_proof"]}
# Error handling
if "error_code" in login_response:
error_code = login_response["error_code"]
logger.debug(f"Login error code: {error_code}")
if error_code == 1000:
# we've failed bois
logger.debug("Login failed due to CAPTCHA")
raise CaptchaRequired()
elif error_code == 3001:
# wrong password you dummy
logger.debug("Login failed due to incorrect login or pass")
print(f"Incorrect username or password.\nUsername: {self.username}")
self.password = getpass()
if not self.password:
raise IncorrectCredentials()
elif error_code in (
3011, # Authy token needed
3012, # Invalid authy token
3022, # Email code needed
3023, # Invalid email code
):
# 2FA handling
email = error_code in (3022, 3023)
logger.debug("2FA token required")
token = input("2FA token: ")
if email:
# email code
payload["twitchguard_code"] = token
else:
# authy token
payload["authy_token"] = token
continue
else:
raise LoginException(login_response["error"])
# Success handling
if "access_token" in login_response:
# we're in bois
self._access_token = login_response["access_token"]
logger.debug(f"Access token: {self._access_token}")
break
if self._access_token is None:
# this means we've ran out of retries
raise LoginException("Ran out of login retries")
return self._access_token
async def check_login(self) -> None:
if self._access_token is not None and self._user_id is not None:
# we're all good
return
# looks like we're missing something
print("Logging in")
jar = self._session.cookie_jar
cookie = jar.filter_cookies("https://twitch.tv") # type: ignore
if not cookie:
# no cookie - login
await self._login()
# store our auth token inside the cookie
cookie["auth-token"] = cast(str, self._access_token)
elif self._access_token is None:
# have cookie - get our access token
self._access_token = cookie["auth-token"].value
logger.debug("Session restored from cookie")
# validate our access token, by obtaining user_id
async with self._session.get(
"https://id.twitch.tv/oauth2/validate",
headers={"Authorization": f"OAuth {self._access_token}"}
) as response:
validate_response = await response.json()
self._user_id = cookie["persistent"] = validate_response["user_id"]
self._is_logged_in.set()
print(f"Login successful, User ID: {self._user_id}")
# update our cookie
jar.update_cookies(cookie, URL("https://twitch.tv"))
async def gql_request(self, op: GQLOperation) -> Dict[str, Any]:
await self.check_login()
headers = {
"Authorization": f"OAuth {self._access_token}",
"Client-Id": CLIENT_ID,
}
logger.debug(f"GQL Request: {op}")
async with self._session.post(GQL_URL, json=op, headers=headers) as response:
response_json = await response.json()
logger.debug(f"GQL Response: {response_json}")
return response_json
async def get_inventory(self) -> List[DropsCampaign]:
response = await self.gql_request(GQL_OPERATIONS["Inventory"])
inventory = response["data"]["currentUser"]["inventory"]
return [DropsCampaign(self, data) for data in inventory["dropCampaignsInProgress"]]
async def claim_points(self, channel_id: Union[str, int], claim_id: str):
variables = {"input": {"channelID": str(channel_id), "claimID": claim_id}}
await self.gql_request(
GQL_OPERATIONS["ClaimCommunityPoints"].with_variables(variables)
)

216
websocket.py Normal file
View File

@@ -0,0 +1,216 @@
from __future__ import annotations
import json
import random
import string
import asyncio
import logging
from typing import Any, Optional, Dict, Tuple, Set, Iterable, TYPE_CHECKING
from websockets.exceptions import ConnectionClosed, ConnectionClosedOK
from websockets.client import WebSocketClientProtocol, connect as websocket_connect
from exceptions import MinerException
from inventory import TimedDrop
from constants import WEBSOCKET_URL, PING_INTERVAL, WebsocketTopic, get_topic
if TYPE_CHECKING:
from twitch import Twitch
logger = logging.getLogger("TwitchDrops")
class Websocket:
def __init__(self, twitch: Twitch):
self._twitch = twitch
self._ws: Optional[WebSocketClientProtocol] = None
self.connected = asyncio.Event() # set when there's an active websocket connection
self.reconnect = asyncio.Event() # set when the websocket needs to reconnect
self._send_queue: asyncio.Queue[Tuple[str, Dict[str, Any]]] = asyncio.Queue()
self._recv_dict: Dict[str, asyncio.Future[Any]] = {}
self._topics: Set[WebsocketTopic] = set()
self._ping_task: Optional[asyncio.Task[Any]] = None
async def connect(self):
# ensure we're logged in before connecting
await self._twitch.wait_until_login()
assert self._twitch._user_id is not None
logger.info("Connecting to Websocket")
# Listen to our events of choice
user_id = self._twitch._user_id
# Defer topics via a task so we can proceed to the actual websocket connection
asyncio.create_task(
self.add_topics([
get_topic("UserDrops", user_id, self.process_drops),
get_topic("UserCommunityPoints", user_id, self.process_points),
])
)
# Connect/Reconnect loop
async for websocket in websocket_connect(WEBSOCKET_URL, ssl=True, ping_interval=None):
websocket.BACKOFF_MAX = 3 * 60 # type: ignore # 3 minutes
self._ws = websocket
self.reconnect.clear()
self.connected.set()
logger.info("Websocket Connected")
self._ping_task = asyncio.create_task(self._ping_loop())
try:
while not self.reconnect.is_set():
# Process receive
try:
# Wait up to 0.5s for a message we're supposed to receive
raw_message = await asyncio.wait_for(websocket.recv(), timeout=0.5)
except asyncio.TimeoutError:
# nothing - skip handling
pass
else:
# we've got something to process
message = json.loads(raw_message)
logger.debug(f"Websocket received: {message}")
msg_type = message["type"]
# handle the simple PING case
if msg_type == "PONG":
ping_future = self._recv_dict.pop("PING", None)
if ping_future is not None and not ping_future.done():
ping_future.set_result(message)
elif msg_type == "RESPONSE":
self._recv_dict.pop(message["nonce"]).set_result(message)
elif msg_type == "RECONNECT":
# We've received a reconnect request
logger.warning("Received a Websocket Reconnect Request")
self.reconnect.set()
elif msg_type == "MESSAGE":
# request the assigned topic to process the response
target_topic = message["data"]["topic"]
for topic in self._topics:
if target_topic == topic:
await topic.process(json.loads(message["data"]["message"]))
break
else:
logger.error(f"Received unknown websocket payload: {message}")
# Early exit if needed
if self.reconnect.is_set():
break
# Process send
while not self._send_queue.empty():
nonce, message = self._send_queue.get_nowait()
if nonce != "PING":
message["nonce"] = nonce
await websocket.send(json.dumps(message, separators=(',', ':')))
logger.debug(f"Websocket sent: {message}")
# A reconnect was requested
continue
except ConnectionClosed as exc:
self.connected.clear()
if self._ping_task is not None:
self._ping_task.cancel()
self._ping_task = None
if isinstance(exc, ConnectionClosedOK):
if exc.rcvd_then_sent:
# server closed the connection, not us - reconnect
logger.warning("Websocket Disconnected - Reconnecting")
continue
# we closed it - exit
break
# otherwise, reconnect
logger.warning("Websocket Closed - Reconnecting")
continue
except Exception:
logger.exception("Exception in Websocket - Reconnecting")
continue
async def close(self):
self.connected.clear()
if self._ping_task is not None:
self._ping_task.cancel()
if self._ws is not None:
await self._ws.close()
async def _ping_loop(self):
await self.connected.wait()
ping_every = PING_INTERVAL.total_seconds()
while self.connected.is_set():
try:
await asyncio.wait_for(self.send({"type": "PING"}), timeout=10)
except asyncio.TimeoutError:
# per documentation, if there's no response for a PING, reconnect to the websocket
logger.warning("Websocket got no response to PING - reconnect")
self.reconnect.set()
break
await asyncio.sleep(ping_every)
def create_nonce(self, length: int = 30) -> str:
available_chars = string.ascii_letters + string.digits
return ''.join(random.choices(available_chars, k=length))
def send(self, message: Dict[str, Any]) -> asyncio.Future[Dict[str, Any]]:
logger.debug(f"Websocket sending: {message}")
msg_type = message["type"]
if msg_type == "PING":
nonce = "PING"
else:
nonce = self.create_nonce()
self._send_queue.put_nowait((nonce, message))
future: asyncio.Future[Dict[str, Any]] = asyncio.get_running_loop().create_future()
self._recv_dict[nonce] = future
return future
async def add_topics(self, topics: Iterable[WebsocketTopic]):
# ensure no topics end up duplicated
topics = set(topics)
topics.difference_update(self._topics)
if not topics:
# none left to add
return
self._topics.update(topics)
if len(self._topics) >= 50:
# TODO: Handle multiple connections (up to 10) since one allows only up to 50 topics
raise MinerException("Too many topics")
await self.connected.wait()
assert self._twitch._access_token is not None
auth_token = self._twitch._access_token
topics_list = list(map(str, topics))
logger.info(f"Listening for: {', '.join(topics_list)}")
await self.send(
{
"type": "LISTEN",
"data": {
"topics": topics_list,
"auth_token": auth_token,
}
}
)
async def process_drops(self, message: Dict[str, Any]):
drop_id = message["data"]["drop_id"]
drop: Optional[TimedDrop] = None
for campaign in self._twitch.inventory:
drop = campaign.get_drop(drop_id)
if drop is not None:
break
else:
logger.error(f"Drop with ID of {drop_id} not found!")
return
drop.update(message)
msg_type = message["type"]
if msg_type == "drop-progress":
print(
f"Drop progress: {drop.progress:4.0%} ({drop.remaining_minutes} minutes remaining)"
)
elif msg_type == "drop-claim":
await drop.claim()
print(f"Claimed drop: {', '.join(drop.rewards)}")
async def process_points(self, message: Dict[str, Any]):
msg_type = message["type"]
if msg_type == "points-earned":
points = message["data"]["point_gain"]["total_points"]
balance = message["data"]["balance"]["balance"]
print(f"Earned points for watching: {points:3}, total: {balance}")
elif msg_type == "claim-available":
claim_data = message["data"]["claim"]
points = claim_data["point_gain"]["total_points"]
await self._twitch.claim_points(claim_data["channel_id"], claim_data["id"])
print(f"Claimed bonus points: {points}")