Made the settings file optional;

default config creation removed, instead fetches a list of all live channels for the current drop campaign's games
This commit is contained in:
DevilXD
2021-12-04 14:16:25 +01:00
parent 947fd6f148
commit fcd9e5b52d
7 changed files with 114 additions and 48 deletions

View File

@@ -11,7 +11,7 @@ from typing import Any, Optional, Dict, TYPE_CHECKING
from inventory import Game
from exceptions import MinerException
from constants import BASE_URL, GQL_OPERATIONS, ONLINE_DELAY
from constants import BASE_URL, GQL_OPERATIONS, ONLINE_DELAY, DROPS_ENABLED_TAG
if TYPE_CHECKING:
from twitch import Twitch
@@ -27,12 +27,25 @@ class Stream:
stream = data["stream"]
self.broadcast_id = int(stream["id"])
self.viewer_count = stream["viewersCount"]
self.drops_enabled = any(tag["localizedName"] == "Drops Enabled" for tag in stream["tags"])
self.drops_enabled = any(tag["id"] == DROPS_ENABLED_TAG for tag in stream["tags"])
settings = data["broadcastSettings"]
self.game: Game = Game(settings["game"])
self.title = settings["title"]
self._timestamp = datetime.now(timezone.utc)
@classmethod
def from_directory(cls, channel: Channel, data: Dict[str, Any]):
self = super().__new__(cls)
self._twitch = channel._twitch
self.channel = channel
self.broadcast_id = data["id"]
self.viewer_count = data["viewersCount"]
self.drops_enabled = any(tag["id"] == DROPS_ENABLED_TAG for tag in data["tags"])
self.game = Game(data["game"])
self.title = data["title"]
self._timestamp = datetime.now(timezone.utc)
return self
class Channel:
async def __new__(cls, *args, **kwargs):
@@ -54,6 +67,19 @@ class Channel:
self._pending_stream_up: Optional[asyncio.Task[Any]] = None
await self.get_stream()
@classmethod
async def from_directory(cls, twitch: Twitch, data: Dict[str, Any]):
self = super().__new__(cls)
self._twitch = twitch
channel = data["broadcaster"]
self.id = channel["id"]
self.name = channel["displayName"]
self.url = f"{BASE_URL}/{self.name}"
self._spade_url = await self.get_spade_url()
self.stream = Stream.from_directory(self, data)
self._pending_stream_up = None
return self
def __eq__(self, other: object):
if isinstance(other, self.__class__):
return self.id == other.id

View File

@@ -15,6 +15,8 @@ SETTINGS_PATH = "settings.json"
COOKIES_PATH = "cookies.pickle"
PING_INTERVAL = timedelta(minutes=3)
ONLINE_DELAY = timedelta(seconds=30)
# tags
DROPS_ENABLED_TAG = "c2542d6d-cd10-4532-919b-3d19f30a768b"
class GQLOperation(Dict[str, Any]):
@@ -29,11 +31,15 @@ class GQLOperation(Dict[str, Any]):
}
)
if variables is not None:
super().__setitem__("variables", variables)
self.__setitem__("variables", variables)
def with_variables(self, variables: Dict[str, Any]):
modified = copy(self)
modified["variables"] = variables
if "variables" in self:
existing_variables: Dict[str, Any] = modified["variables"]
existing_variables.update(variables)
else:
modified["variables"] = variables
return modified
@@ -58,10 +64,6 @@ GQL_OPERATIONS: Dict[str, GQLOperation] = {
"ChannelPointsContext",
"9988086babc615a918a1e9a722ff41d98847acac822645209ac7379eecb27152",
),
"ModViewChannelQuery": GQLOperation(
"ModViewChannelQuery",
"df5d55b6401389afb12d3017c9b2cf1237164220c8ef4ed754eae8188068a807",
),
"Inventory": GQLOperation( # used
"Inventory",
"e0765ebaa8e8eeb4043cc6dfeab3eac7f682ef5f724b81367e6e55c7aef2be4c",
@@ -78,11 +80,6 @@ GQL_OPERATIONS: Dict[str, GQLOperation] = {
"DropsHighlightService_AvailableDrops",
"b19ee96a0e79e3f8281c4108bc4c7b3f232266db6f96fd04a339ab393673a075",
),
# Use for replace https://api.twitch.tv/helix/users?login={self.username}
"ReportMenuItem": GQLOperation(
"ReportMenuItem",
"8f3628981255345ca5e5453dfd844efffb01d6413a9931498836e6268692a30c",
),
"PersonalSections": GQLOperation(
"PersonalSections",
"9fbdfb00156f754c26bde81eb47436dee146655c92682328457037da1a48ed39",
@@ -96,6 +93,22 @@ GQL_OPERATIONS: Dict[str, GQLOperation] = {
"creatorAnniversariesExperimentEnabled": False,
},
),
"GameDirectory": GQLOperation(
"DirectoryPage_Game",
"d5c5df7ab9ae65c3ea0f225738c08a36a4a76e4c6c31db7f8c4b8dc064227f9e",
variables={
"limit": 40,
"name": "paladins",
"options": {
"includeRestricted": ["SUB_ONLY_LIVE"],
"recommendationsContext": {"platform": "web"},
"sort": "RELEVANCE",
"tags": [],
"requestID": "JIRA-VXP-2397",
},
"sortTypeIsRecency": False
},
),
}

4
example_settings.json Normal file
View File

@@ -0,0 +1,4 @@
{
"username": "YourTwitchUsername",
"channels": ["Channel1", "Channel2", "Channel3"]
}

View File

@@ -68,6 +68,9 @@ class BaseDrop:
def can_claim(self) -> bool:
return self.claim_id is not None and not self.is_claimed
def rewards_text(self, delim: str = ", ") -> str:
return delim.join(self.rewards)
async def claim(self) -> bool:
"""
Returns True if the claim succeeded, False otherwise.

33
main.py
View File

@@ -2,7 +2,6 @@ from __future__ import annotations
__version__ = 2
import os
import sys
import json
import ctypes
@@ -56,36 +55,20 @@ handler.setFormatter(logging.Formatter("{levelname}: {message}", style='{'))
logger.addHandler(handler)
logger.setLevel(logging_level)
# handle settings
if not os.path.isfile(SETTINGS_PATH):
default = {
"username": "YourTwitchUsername",
"password": None,
"channels": ["Channel1", "Channel2", "Channel3"],
}
with open(SETTINGS_PATH, 'w', encoding="utf8") as file:
json.dump(default, file, indent=4)
print(
f"File '{SETTINGS_PATH}' created.\n"
"Please modify default settings as necessary, then relaunch the application."
)
terminate()
with open(SETTINGS_PATH, 'r', encoding="utf8") as file:
try:
try:
with open(SETTINGS_PATH, 'r', encoding="utf8") as file:
settings: Dict[str, Any] = json.load(file)
except json.JSONDecodeError as exc:
print(f"Error while reading the settings file:\n{str(exc)}")
terminate()
required_fields = ["channels"]
for field_name in required_fields:
if field_name not in settings:
print(f"Field '{field_name}' is a required field in '{SETTINGS_PATH}'")
terminate()
except json.JSONDecodeError as exc:
print(f"Error while reading the settings file:\n{str(exc)}")
terminate()
except FileNotFoundError:
settings = {}
# asyncio loop
loop = asyncio.get_event_loop()
# client init
client = Twitch(settings.get("username"), settings.get("password"))
# main task and it's close event
main_task = loop.create_task(client.run(settings["channels"]))
main_task = loop.create_task(client.run(settings.get("channels")))
close_event = threading.Event()

View File

@@ -6,7 +6,7 @@ import logging
from yarl import URL
from getpass import getpass
from functools import partial
from typing import Any, Optional, Union, List, Dict, cast
from typing import Any, Optional, Union, List, Dict, Collection, cast
try:
import aiohttp
@@ -15,7 +15,7 @@ except ImportError:
from channel import Channel
from websocket import Websocket
from inventory import DropsCampaign
from inventory import DropsCampaign, Game
from exceptions import LoginException, CaptchaRequired
from constants import (
CLIENT_ID,
@@ -24,6 +24,7 @@ from constants import (
AUTH_URL,
GQL_URL,
GQL_OPERATIONS,
DROPS_ENABLED_TAG,
GQLOperation,
WebsocketTopic,
get_topic,
@@ -77,7 +78,7 @@ class Twitch:
def is_currently_watching(self, channel: Channel) -> bool:
return self._watching_channel is not None and self._watching_channel == channel
async def run(self, channels: List[str] = []):
async def run(self, channel_names: Optional[List[str]] = None):
"""
Main method that runs the whole client.
@@ -100,10 +101,22 @@ class Twitch:
games.add(campaign.game)
if drop.can_claim:
await drop.claim()
# Fetch information about all channels we're supposed to handle
for channel_name in channels:
channel: Channel = await Channel(self, channel_name) # type: ignore
self.channels[channel.id] = channel
# games now has all games we want to farm drops for
if not channel_names:
# get a list of all channels with drops enabled
print("Fetching suitable live channels to watch...")
live_streams: Dict[Game, List[Channel]] = await self.get_live_streams(
games, [DROPS_ENABLED_TAG]
)
for game, channels in live_streams.items():
for channel in channels:
self.channels[channel.id] = channel
print(f"Added channel: {channel.name} for game: {game.name}")
else:
# Fetch information about all channels we're supposed to handle
for channel_name in channel_names:
channel: Channel = await Channel(self, channel_name) # type: ignore
self.channels[channel.id] = channel
# Sub to these channel updates
topics: List[WebsocketTopic] = []
for channel_id in self.channels:
@@ -168,7 +181,8 @@ class Twitch:
i = (i + 1) % 30
await asyncio.sleep(59)
print(f"Watching: {channel.name}")
game = channel.stream.game.name if channel.stream is not None else "<Unknown>"
print(f"Watching: {channel.name}, game: {game}")
self._watching_channel = channel
self._watching_task = asyncio.create_task(watcher(channel))
@@ -352,6 +366,28 @@ class Twitch:
inventory = response["data"]["currentUser"]["inventory"]
return [DropsCampaign(self, data) for data in inventory["dropCampaignsInProgress"]]
async def get_live_streams(
self, games: Collection[Game], tag_ids: List[str]
) -> Dict[Game, List[Channel]]:
limit = int(45 / len(games))
live_streams = {}
for game in games:
response = await self.gql_request(
GQL_OPERATIONS["GameDirectory"].with_variables({
"limit": limit,
"name": game.name,
"options": {
"includeRestricted": ["SUB_ONLY_LIVE"],
"tags": tag_ids,
},
})
)
live_streams[game] = [
await Channel.from_directory(self, stream_channel_data["node"])
for stream_channel_data in response["data"]["game"]["streams"]["edges"]
]
return live_streams
async def claim_points(self, channel_id: Union[str, int], claim_id: str):
variables = {"input": {"channelID": str(channel_id), "claimID": claim_id}}
await self.gql_request(

View File

@@ -197,11 +197,12 @@ class Websocket:
msg_type = message["type"]
if msg_type == "drop-progress":
print(
f"Drop progress: {drop.progress:4.0%} ({drop.remaining_minutes} minutes remaining)"
f"Drop: {drop.rewards_text()}: {drop.progress:4.0%} "
f"({drop.remaining_minutes} minutes remaining)"
)
elif msg_type == "drop-claim":
await drop.claim()
print(f"Claimed drop: {', '.join(drop.rewards)}")
print(f"Claimed drop: {drop.rewards_text()}")
async def process_points(self, message: Dict[str, Any]):
msg_type = message["type"]