feat: add diagnostic scripts to collect connection and signaling metadata for Telemost, WB Stream, and SaluteJazz

This commit is contained in:
zarazaex69
2026-04-20 06:03:22 +03:00
parent af9fd436e4
commit dfafa66d19
3 changed files with 200 additions and 0 deletions

59
code/jazz_info.py Normal file
View File

@@ -0,0 +1,59 @@
#!/usr/bin/env python3
import asyncio
import json
import uuid
import aiohttp
API_BASE = "https://bk.salutejazz.ru"
JAZZ_HEADERS = {"X-Jazz-ClientId": str(uuid.uuid4()), "X-Jazz-AuthType": "ANONYMOUS", "X-Client-AuthType": "ANONYMOUS", "Content-Type": "application/json"}
async def get_jazz_info():
print("\n--- SaluteJazz Info ---")
timeout = aiohttp.ClientTimeout(total=15)
async with aiohttp.ClientSession(timeout=timeout) as session:
print("[1/4] API Initialization...")
try:
r = await session.post(f"{API_BASE}/room/create-meeting", headers=JAZZ_HEADERS, json={"title": "InfoBot", "guestEnabled": True, "lobbyEnabled": False, "room3dEnabled": False})
rj = await r.json()
print(" :P Room created")
print(json.dumps(rj, indent=2))
r2 = await session.post(f"{API_BASE}/room/{rj['roomId']}/preconnect", headers=JAZZ_HEADERS, json={"password": rj["password"], "jazzNextMigration": {"b2bBaseRoomSupport": True, "sdkRoomSupport": True, "mediaWithoutAutoSubscribeSupport": True}})
r2j = await r2.json()
print(" :P Preconnect info received")
print(json.dumps(r2j, indent=2))
conn_url = r2j['connectorUrl']
except Exception as e:
print(f" X Error: {e}"); return
print(f"\n[2/4] Connecting to signaling...")
async with session.ws_connect(conn_url) as ws:
await ws.send_json({"roomId": rj["roomId"], "event": "join", "requestId": str(uuid.uuid4()), "payload": {"password": rj["password"], "participantName": "InfoBot", "supportedFeatures": {"attachedRooms": True}, "isSilent": False}})
print(" :P Signaling established")
print("\n[3/4] Collecting network & media details...")
end = asyncio.get_event_loop().time() + 8
while asyncio.get_event_loop().time() < end:
try:
m = await asyncio.wait_for(ws.receive(), 1)
if m.type == aiohttp.WSMsgType.TEXT:
d = json.loads(m.data); ev = d.get("event", ""); p = d.get("payload", {}); meth = p.get("method", "")
print(f" -> Event: {ev}{' ('+meth+')' if meth else ''}")
if meth == "rtc:config":
print("\n--- ICE Servers ---")
print(json.dumps(p.get("configuration", {}).get("iceServers", []), indent=2))
elif meth == "rtc:offer":
print("\n--- SDP Offer (Codecs & Quality) ---")
print(p.get("description", {}).get("sdp", ""))
elif ev == "join-response":
print("\n--- Participant Group ---")
print(json.dumps(p.get("participantGroup", {}), indent=2))
else:
print(json.dumps(p, indent=2))
except: continue
print("\n--- INFO COLLECTION COMPLETE ---")
if __name__ == "__main__":
try: asyncio.run(get_jazz_info())
except KeyboardInterrupt: pass

58
code/telemost_info.py Normal file
View File

@@ -0,0 +1,58 @@
#!/usr/bin/env python3
import asyncio
import json
import uuid
import aiohttp
from urllib.parse import quote
API_BASE = "https://cloud-api.yandex.ru/telemost_front/v2/telemost"
CONFERENCE_ID = "75047680642749"
ROOM_URL = f"https://telemost.yandex.ru/j/{CONFERENCE_ID}"
async def get_telemost_info():
print("\n--- Yandex Telemost Info ---")
async with aiohttp.ClientSession() as session:
print(f"[1/3] Fetching connection info...")
url = f"{API_BASE}/conferences/{quote(ROOM_URL, safe='')}/connection"
params = {"next_gen_media_platform_allowed": "true", "display_name": "InfoBot", "waiting_room_supported": "true"}
headers = {"User-Agent": "Mozilla/5.0 (Linux x86_64)", "Client-Instance-Id": str(uuid.uuid4()), "X-Telemost-Client-Version": "187.1.0", "idempotency-key": str(uuid.uuid4())}
try:
async with session.get(url, params=params, headers=headers) as resp:
if resp.status != 200: print(f" X API Fail: {resp.status}"); return
conn_info = await resp.json()
print(" :P Connection data received")
print(json.dumps(conn_info, indent=2))
except Exception as e: print(f" X Error: {e}"); return
print(f"\n[2/3] Connecting to signaling...")
try:
async with session.ws_connect(conn_info["client_configuration"]["media_server_url"]) as ws:
await ws.send_json({"uid": str(uuid.uuid4()), "hello": {"participantMeta": {"name": "InfoBot", "role": "SPEAKER", "sendAudio": False, "sendVideo": False}, "participantAttributes": {"name": "InfoBot", "role": "SPEAKER"}, "sendAudio": False, "sendVideo": False, "sendSharing": False, "participantId": conn_info["peer_id"], "roomId": conn_info["room_id"], "serviceName": "telemost", "credentials": conn_info["credentials"], "capabilitiesOffer": {"offerAnswerMode": ["SEPARATE"], "initialSubscriberOffer": ["ON_HELLO"], "slotsMode": ["FROM_CONTROLLER"], "simulcastMode": ["DISABLED"], "selfVadStatus": ["FROM_SERVER"], "dataChannelSharing": ["TO_RTP"]}, "sdkInfo": {"implementation": "python", "version": "1.0.0", "userAgent": "OlcRTC-InfoBot"}, "sdkInitializationId": str(uuid.uuid4()), "disablePublisher": False, "disableSubscriber": False}})
print(" :P Signaling established")
print("\n[3/3] Collecting media details...")
end = asyncio.get_event_loop().time() + 8
while asyncio.get_event_loop().time() < end:
try:
m = await asyncio.wait_for(ws.receive(), 1)
if m.type == aiohttp.WSMsgType.TEXT:
d = json.loads(m.data); uid = d.get("uid")
print(f" -> Message: {list(d.keys())}")
if "serverHello" in d:
print("\n--- Server Hello / Telemetry ---")
print(json.dumps(d["serverHello"], indent=2))
elif "subscriberSdpOffer" in d:
print("\n--- SDP Offer (Codecs & Quality) ---")
print(d["subscriberSdpOffer"].get("sdp"))
elif "webrtcIceCandidate" in d:
print(f" -> ICE: {d['webrtcIceCandidate'].get('candidate')}")
if uid: await ws.send_json({"uid": uid, "ack": {"status": {"code": "OK"}}})
except: continue
except Exception as e: print(f" X Signaling Fail: {e}")
print("\n--- INFO COLLECTION COMPLETE ---")
if __name__ == "__main__":
try: asyncio.run(get_telemost_info())
except KeyboardInterrupt: pass

83
code/wbstream_info.py Normal file
View File

@@ -0,0 +1,83 @@
#!/usr/bin/env python3
import asyncio
import json
import requests
from livekit import rtc
API_BASE = "https://stream.wb.ru"
WS_URL = "wss://wbstream01-el.wb.ru:7880"
def _get_room_token(room_id: str, display_name: str) -> tuple[str, str]:
headers = {"User-Agent": "Mozilla/5.0 (Linux x86_64)", "Content-Type": "application/json"}
print("[1/3] API Initialization...")
reg_req = requests.post(f"{API_BASE}/auth/api/v1/auth/user/guest-register", json={"displayName": display_name, "device": {"deviceName": "Linux", "deviceType": "PARTICIPANT_DEVICE_TYPE_WEB_DESKTOP"}}, headers=headers)
reg_req.raise_for_status()
auth_data = reg_req.json()
print(" :P Guest registered")
print(json.dumps(auth_data, indent=2))
headers["Authorization"] = f"Bearer {auth_data['accessToken']}"
if not room_id:
print("\n[2/3] Room Preparation...")
room_req = requests.post(f"{API_BASE}/api-room/api/v2/room", json={"roomType": "ROOM_TYPE_ALL_ON_SCREEN", "roomPrivacy": "ROOM_PRIVACY_FREE"}, headers=headers)
room_req.raise_for_status()
room_data = room_req.json()
print(" :P Room created")
print(json.dumps(room_data, indent=2))
room_id = room_data["roomId"]
print(f"\n[3/3] Fetching LiveKit token...")
requests.post(f"{API_BASE}/api-room/api/v1/room/{room_id}/join", json={}, headers=headers).raise_for_status()
tok_req = requests.get(f"{API_BASE}/api-room-manager/api/v1/room/{room_id}/token", params={"deviceType": "PARTICIPANT_DEVICE_TYPE_WEB_DESKTOP", "displayName": display_name}, headers=headers)
tok_req.raise_for_status()
token_data = tok_req.json()
print(" :P Token received")
print(json.dumps(token_data, indent=2))
return room_id, token_data["roomToken"]
async def get_wb_info():
print("\n--- WB Stream Info ---")
try:
room_id, token = _get_room_token("", "InfoBot")
except Exception as e:
print(f" X Auth failed: {e}"); return
room = rtc.Room()
@room.on("participant_connected")
def on_participant_connected(p):
print(f" -> Participant Connected: {p.identity} | Metadata: {p.metadata}")
@room.on("track_subscribed")
def on_track_subscribed(t, pub, p):
print(f" -> Track Subscribed: {pub.name} ({t.kind}) from {p.identity}")
print(f"\nConnecting to LiveKit: {WS_URL}")
try:
await room.connect(WS_URL, token)
print(" :P Connected to room")
print(f"\n--- Room State ---")
sid = room.sid
if asyncio.iscoroutine(sid): sid = await sid
print(f"Room: {room.name} (SID: {sid})")
print(f"Metadata: {room.metadata}")
print("\nRemote Participants:")
for sid, p in room.remote_participants.items():
print(f" - {p.identity} | Metadata: {p.metadata}")
for tsid, t in p.track_publications.items(): print(f" * Track {t.name}: {t.kind}")
print("\nWaiting 8s for events...")
await asyncio.sleep(8)
except Exception as e: print(f" X Connection error: {e}")
finally: await room.disconnect()
print("\n--- INFO COLLECTION COMPLETE ---")
if __name__ == "__main__":
try: asyncio.run(get_wb_info())
except KeyboardInterrupt: pass