From 8f2c18bccfb94f31844f436d5fe09f96051924c9 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Tue, 14 Apr 2026 01:32:28 +0300 Subject: [PATCH] chore(code): remove legacy implementation files --- code/dcsend.py | 415 ------------------ code/dcstream.py | 455 ------------------- code/flood.py | 144 ------- code/info.py | 565 ------------------------ code/invicible.py | 592 ------------------------- code/limits.py | 1054 --------------------------------------------- code/olcrtc.py | 573 ------------------------ code/vcsend.py | 625 --------------------------- 8 files changed, 4423 deletions(-) delete mode 100755 code/dcsend.py delete mode 100755 code/dcstream.py delete mode 100755 code/flood.py delete mode 100755 code/info.py delete mode 100755 code/invicible.py delete mode 100755 code/limits.py delete mode 100644 code/olcrtc.py delete mode 100755 code/vcsend.py diff --git a/code/dcsend.py b/code/dcsend.py deleted file mode 100755 index 4540590..0000000 --- a/code/dcsend.py +++ /dev/null @@ -1,415 +0,0 @@ -#!/usr/bin/env python3 - -import asyncio -import json -import uuid -import websockets -import requests -from urllib.parse import quote -from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceCandidate, RTCConfiguration, RTCIceServer - -CONFERENCE_ID = "75047680642749" -CONFERENCE_URL = f"https://telemost.yandex.ru/j/{CONFERENCE_ID}" -API_BASE = "https://cloud-api.yandex.ru/telemost_front/v2/telemost" - -CHUNK_SIZE = 7168 -HEADER_SIZE = 1024 - -def generate_uuid(): - return str(uuid.uuid4()) - -def get_connection_info(display_name): - url = f"{API_BASE}/conferences/{quote(CONFERENCE_URL, safe='')}/connection" - params = { - "next_gen_media_platform_allowed": "true", - "display_name": display_name, - "waiting_room_supported": "true" - } - - headers = { - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0", - "Accept": "*/*", - "content-type": "application/json", - "Client-Instance-Id": generate_uuid(), - "X-Telemost-Client-Version": "187.1.0", - "idempotency-key": generate_uuid(), - "Origin": "https://telemost.yandex.ru", - "Referer": "https://telemost.yandex.ru/" - } - - response = requests.get(url, params=params, headers=headers) - response.raise_for_status() - return response.json() - -def chunk_data(data, transfer_id): - total_size = len(data) - chunk_count = (total_size + CHUNK_SIZE - 1) // CHUNK_SIZE - packets = [] - - for i in range(chunk_count): - start = i * CHUNK_SIZE - end = min(start + CHUNK_SIZE, total_size) - chunk = data[start:end] - - header = {"tid": transfer_id, "idx": i, "total": chunk_count, "size": total_size} - header_json = json.dumps(header).encode() - header_padded = header_json.ljust(HEADER_SIZE, b'\x00') - - packets.append(header_padded + chunk) - - return packets - -class ChunkedReceiver: - def __init__(self): - self.buffers = {} - self.completed = {} - - def handle_chunk(self, packet): - if len(packet) < HEADER_SIZE: - return None - - header_bytes = packet[:HEADER_SIZE].rstrip(b'\x00') - chunk_data = packet[HEADER_SIZE:] - - try: - header = json.loads(header_bytes) - except: - return None - - tid = header["tid"] - idx = header["idx"] - total = header["total"] - - if tid not in self.buffers: - self.buffers[tid] = {"chunks": {}, "total": total, "received": 0} - - buf = self.buffers[tid] - - if idx not in buf["chunks"]: - buf["chunks"][idx] = chunk_data - buf["received"] += 1 - - if buf["received"] == buf["total"]: - complete = b"".join(buf["chunks"][i] for i in range(buf["total"])) - self.completed[tid] = complete - del self.buffers[tid] - return tid - - return None - -async def create_peer(name, is_server=False): - conn_info = get_connection_info(name) - room_id = conn_info["room_id"] - peer_id = conn_info["peer_id"] - credentials = conn_info["credentials"] - ws_url = conn_info["client_configuration"]["media_server_url"] - - pc_sub = RTCPeerConnection(RTCConfiguration( - iceServers=[RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"])] - )) - - pc_pub = RTCPeerConnection(RTCConfiguration( - iceServers=[RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"])] - )) - - dc_pub = pc_pub.createDataChannel("dcsend", ordered=True) - dc_pub_open = asyncio.Event() - - receiver = ChunkedReceiver() - - stats = { - "sent": 0, - "received": 0, - "bytes_sent": 0, - "bytes_received": 0, - "messages": [] - } - - @dc_pub.on("open") - def on_pub_open(): - dc_pub_open.set() - - @dc_pub.on("message") - def on_pub_msg(msg): - if isinstance(msg, str): - stats["messages"].append(("text", msg)) - else: - tid = receiver.handle_chunk(msg) - if tid and tid in receiver.completed: - data = receiver.completed[tid] - stats["messages"].append(("data", data)) - del receiver.completed[tid] - - @pc_sub.on("datachannel") - def on_sub_dc(channel): - @channel.on("message") - def on_message(message): - if isinstance(message, str): - stats["messages"].append(("text", message)) - - if is_server and message.startswith("GET "): - url = message[4:].strip() - asyncio.create_task(handle_request(url, dc_pub, stats)) - else: - tid = receiver.handle_chunk(message) - if tid and tid in receiver.completed: - data = receiver.completed[tid] - stats["messages"].append(("data", data)) - del receiver.completed[tid] - - ws = await websockets.connect(ws_url) - - hello_msg = { - "uid": generate_uuid(), - "hello": { - "participantMeta": {"name": name, "role": "SPEAKER", "sendAudio": False, "sendVideo": False}, - "participantAttributes": {"name": name, "role": "SPEAKER"}, - "sendAudio": False, - "sendVideo": False, - "sendSharing": False, - "participantId": peer_id, - "roomId": room_id, - "serviceName": "telemost", - "credentials": credentials, - "capabilitiesOffer": { - "offerAnswerMode": ["SEPARATE"], - "initialSubscriberOffer": ["ON_HELLO"], - "slotsMode": ["FROM_CONTROLLER"], - "simulcastMode": ["DISABLED"], - "selfVadStatus": ["FROM_SERVER"], - "dataChannelSharing": ["TO_RTP"] - }, - "sdkInfo": {"implementation": "python", "version": "1.0.0", "userAgent": f"DCSend-{name}"}, - "sdkInitializationId": generate_uuid(), - "disablePublisher": False, - "disableSubscriber": False - } - } - - await ws.send(json.dumps(hello_msg)) - - publisher_sdp_sent = False - - async def ws_handler(): - nonlocal publisher_sdp_sent - while True: - try: - data = json.loads(await ws.recv()) - - if "serverHello" in data: - await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) - - if "subscriberSdpOffer" in data and not publisher_sdp_sent: - await pc_sub.setRemoteDescription(RTCSessionDescription( - sdp=data["subscriberSdpOffer"]["sdp"], type="offer" - )) - - answer = await pc_sub.createAnswer() - await pc_sub.setLocalDescription(answer) - - await ws.send(json.dumps({ - "uid": generate_uuid(), - "subscriberSdpAnswer": { - "pcSeq": data["subscriberSdpOffer"]["pcSeq"], - "sdp": pc_sub.localDescription.sdp - } - })) - - await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) - await asyncio.sleep(0.3) - - pub_offer = await pc_pub.createOffer() - await pc_pub.setLocalDescription(pub_offer) - - await ws.send(json.dumps({ - "uid": generate_uuid(), - "publisherSdpOffer": { - "pcSeq": 1, - "sdp": pc_pub.localDescription.sdp - } - })) - publisher_sdp_sent = True - - if "publisherSdpAnswer" in data: - await pc_pub.setRemoteDescription(RTCSessionDescription( - sdp=data["publisherSdpAnswer"]["sdp"], type="answer" - )) - await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) - - if "webrtcIceCandidate" in data: - cand = data["webrtcIceCandidate"] - try: - parts = cand["candidate"].split() - if len(parts) >= 8: - ice = RTCIceCandidate( - component=int(parts[1]), - foundation=parts[0].replace("candidate:", ""), - ip=parts[4], - port=int(parts[5]), - priority=int(parts[3]), - protocol=parts[2], - type=parts[7], - sdpMid=cand["sdpMid"], - sdpMLineIndex=cand["sdpMlineIndex"] - ) - - if cand.get("target") == "SUBSCRIBER": - await pc_sub.addIceCandidate(ice) - elif cand.get("target") == "PUBLISHER": - await pc_pub.addIceCandidate(ice) - except: - pass - - except: - break - - @pc_sub.on("icecandidate") - async def on_sub_ice(event): - if event.candidate: - await ws.send(json.dumps({ - "uid": generate_uuid(), - "webrtcIceCandidate": { - "candidate": event.candidate.candidate, - "sdpMid": event.candidate.sdpMid, - "sdpMlineIndex": event.candidate.sdpMLineIndex, - "target": "SUBSCRIBER", - "pcSeq": 1 - } - })) - - @pc_pub.on("icecandidate") - async def on_pub_ice(event): - if event.candidate: - await ws.send(json.dumps({ - "uid": generate_uuid(), - "webrtcIceCandidate": { - "candidate": event.candidate.candidate, - "sdpMid": event.candidate.sdpMid, - "sdpMlineIndex": event.candidate.sdpMLineIndex, - "target": "PUBLISHER", - "pcSeq": 1 - } - })) - - ws_task = asyncio.create_task(ws_handler()) - - return { - "name": name, - "dc_pub": dc_pub, - "dc_pub_open": dc_pub_open, - "stats": stats, - "ws": ws, - "ws_task": ws_task, - "pc_sub": pc_sub, - "pc_pub": pc_pub - } - -async def handle_request(url, dc, stats): - try: - if not url.startswith(('http://', 'https://')): - url = 'https://' + url - - print(f" -> Fetching {url}...") - response = requests.get(url, timeout=10) - response.raise_for_status() - - data = response.content - print(f" -> Got {len(data)} bytes, sending...") - - transfer_id = generate_uuid() - packets = chunk_data(data, transfer_id) - - for packet in packets: - while dc.bufferedAmount > CHUNK_SIZE * 2: - await asyncio.sleep(0.001) - dc.send(packet) - stats["sent"] += 1 - stats["bytes_sent"] += len(packet) - - print(f" :P Sent {len(packets)} chunks") - - except Exception as e: - print(f" X Error: {e}") - try: - dc.send(f"ERROR: {str(e)}") - except: - pass - -async def run_dcsend(): - print(r""" - DCSend - DataChannel Transfer - Request/Response over Yandex Telemost SFU - by zowue for olc -""") - - print("[1/3] Creating server peer...") - try: - server = await create_peer("Server", is_server=True) - await asyncio.wait_for(server["dc_pub_open"].wait(), timeout=10.0) - print(" :P Server ready") - except Exception as e: - print(f" X Error: {e}") - return - - print("\n[2/3] Creating client peer...") - try: - client = await create_peer("Client", is_server=False) - await asyncio.wait_for(client["dc_pub_open"].wait(), timeout=10.0) - print(" :P Client ready") - except Exception as e: - print(f" X Error: {e}") - return - - print("\n[3/3] Starting transfer...") - await asyncio.sleep(2) - - url = "zarazaex.xyz/curl.txt" - print(f" -> Client requesting: {url}") - - client["dc_pub"].send(f"GET {url}") - - print(" -> Waiting for response...") - - for _ in range(30): - await asyncio.sleep(0.5) - - for msg_type, msg_data in client["stats"]["messages"]: - if msg_type == "data": - print(f"\n :P Received {len(msg_data)} bytes") - print("\n--- Response Content ---") - try: - print(msg_data.decode('utf-8')) - except: - print(f"[Binary data: {len(msg_data)} bytes]") - print("--- End ---\n") - - client["stats"]["messages"].clear() - - print("\nCleaning up...") - server["ws_task"].cancel() - client["ws_task"].cancel() - await server["ws"].close() - await client["ws"].close() - await server["pc_sub"].close() - await server["pc_pub"].close() - await client["pc_sub"].close() - await client["pc_pub"].close() - - print(":P Transfer complete") - return - - elif msg_type == "text" and msg_data.startswith("ERROR"): - print(f"\n X {msg_data}\n") - client["stats"]["messages"].clear() - return - - print("\n X Timeout waiting for response") - -async def main(): - await run_dcsend() - -if __name__ == "__main__": - try: - asyncio.run(main()) - except KeyboardInterrupt: - print("\n\nTransfer interrupted.") diff --git a/code/dcstream.py b/code/dcstream.py deleted file mode 100755 index febf30e..0000000 --- a/code/dcstream.py +++ /dev/null @@ -1,455 +0,0 @@ -#!/usr/bin/env python3 - -import asyncio -import json -import uuid -import websockets -import requests -from urllib.parse import quote -from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceCandidate, RTCConfiguration, RTCIceServer - -CONFERENCE_ID = "75047680642749" -CONFERENCE_URL = f"https://telemost.yandex.ru/j/{CONFERENCE_ID}" -API_BASE = "https://cloud-api.yandex.ru/telemost_front/v2/telemost" - -CHUNK_SIZE = 8152 -BUFFER_THRESHOLD = 262144 -SEND_DELAY = 0.0005 - -def generate_uuid(): - return str(uuid.uuid4()) - -def get_connection_info(display_name): - url = f"{API_BASE}/conferences/{quote(CONFERENCE_URL, safe='')}/connection" - params = { - "next_gen_media_platform_allowed": "true", - "display_name": display_name, - "waiting_room_supported": "true" - } - - headers = { - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0", - "Accept": "*/*", - "content-type": "application/json", - "Client-Instance-Id": generate_uuid(), - "X-Telemost-Client-Version": "187.1.0", - "idempotency-key": generate_uuid(), - "Origin": "https://telemost.yandex.ru", - "Referer": "https://telemost.yandex.ru/" - } - - response = requests.get(url, params=params, headers=headers) - response.raise_for_status() - return response.json() - -class StreamReceiver: - def __init__(self): - self.streams = {} - self.data = {} - - def handle_packet(self, packet): - if len(packet) < 40: - return - - try: - stream_id = packet[:36].decode().strip() - seq = int.from_bytes(packet[36:40], 'big') - chunk = packet[40:] - - if stream_id not in self.streams: - self.streams[stream_id] = {} - self.data[stream_id] = [] - - self.streams[stream_id][seq] = chunk - except: - pass - - def get_data(self, stream_id): - if stream_id not in self.streams: - return b"" - - result = [] - seq = 0 - while seq in self.streams[stream_id]: - result.append(self.streams[stream_id][seq]) - seq += 1 - - return b"".join(result) - -async def create_peer(name, is_server=False): - conn_info = get_connection_info(name) - room_id = conn_info["room_id"] - peer_id = conn_info["peer_id"] - credentials = conn_info["credentials"] - ws_url = conn_info["client_configuration"]["media_server_url"] - - pc_sub = RTCPeerConnection(RTCConfiguration( - iceServers=[RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"])] - )) - - pc_pub = RTCPeerConnection(RTCConfiguration( - iceServers=[RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"])] - )) - - dc_pub = pc_pub.createDataChannel("dcstream", ordered=True) - dc_pub_open = asyncio.Event() - - receiver = StreamReceiver() - - stats = { - "sent": 0, - "received": 0, - "bytes_sent": 0, - "bytes_received": 0, - "start_time": None, - "commands": [] - } - - @dc_pub.on("open") - def on_pub_open(): - dc_pub_open.set() - - @dc_pub.on("message") - def on_pub_msg(msg): - if isinstance(msg, str): - stats["commands"].append(msg) - else: - receiver.handle_packet(msg) - stats["received"] += 1 - stats["bytes_received"] += len(msg) - - @pc_sub.on("datachannel") - def on_sub_dc(channel): - @channel.on("message") - def on_message(message): - if isinstance(message, str): - stats["commands"].append(message) - - if is_server and message.startswith("STREAM "): - url = message[7:].strip() - asyncio.create_task(stream_data(url, dc_pub, stats)) - else: - receiver.handle_packet(message) - stats["received"] += 1 - stats["bytes_received"] += len(message) - - ws = await websockets.connect(ws_url) - - hello_msg = { - "uid": generate_uuid(), - "hello": { - "participantMeta": {"name": name, "role": "SPEAKER", "sendAudio": False, "sendVideo": False}, - "participantAttributes": {"name": name, "role": "SPEAKER"}, - "sendAudio": False, - "sendVideo": False, - "sendSharing": False, - "participantId": peer_id, - "roomId": room_id, - "serviceName": "telemost", - "credentials": credentials, - "capabilitiesOffer": { - "offerAnswerMode": ["SEPARATE"], - "initialSubscriberOffer": ["ON_HELLO"], - "slotsMode": ["FROM_CONTROLLER"], - "simulcastMode": ["DISABLED"], - "selfVadStatus": ["FROM_SERVER"], - "dataChannelSharing": ["TO_RTP"] - }, - "sdkInfo": {"implementation": "python", "version": "1.0.0", "userAgent": f"DCStream-{name}"}, - "sdkInitializationId": generate_uuid(), - "disablePublisher": False, - "disableSubscriber": False - } - } - - await ws.send(json.dumps(hello_msg)) - - publisher_sdp_sent = False - - async def ws_handler(): - nonlocal publisher_sdp_sent - while True: - try: - data = json.loads(await ws.recv()) - - if "serverHello" in data: - await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) - - if "subscriberSdpOffer" in data and not publisher_sdp_sent: - await pc_sub.setRemoteDescription(RTCSessionDescription( - sdp=data["subscriberSdpOffer"]["sdp"], type="offer" - )) - - answer = await pc_sub.createAnswer() - await pc_sub.setLocalDescription(answer) - - await ws.send(json.dumps({ - "uid": generate_uuid(), - "subscriberSdpAnswer": { - "pcSeq": data["subscriberSdpOffer"]["pcSeq"], - "sdp": pc_sub.localDescription.sdp - } - })) - - await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) - await asyncio.sleep(0.3) - - pub_offer = await pc_pub.createOffer() - await pc_pub.setLocalDescription(pub_offer) - - await ws.send(json.dumps({ - "uid": generate_uuid(), - "publisherSdpOffer": { - "pcSeq": 1, - "sdp": pc_pub.localDescription.sdp - } - })) - publisher_sdp_sent = True - - if "publisherSdpAnswer" in data: - await pc_pub.setRemoteDescription(RTCSessionDescription( - sdp=data["publisherSdpAnswer"]["sdp"], type="answer" - )) - await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) - - if "webrtcIceCandidate" in data: - cand = data["webrtcIceCandidate"] - try: - parts = cand["candidate"].split() - if len(parts) >= 8: - ice = RTCIceCandidate( - component=int(parts[1]), - foundation=parts[0].replace("candidate:", ""), - ip=parts[4], - port=int(parts[5]), - priority=int(parts[3]), - protocol=parts[2], - type=parts[7], - sdpMid=cand["sdpMid"], - sdpMLineIndex=cand["sdpMlineIndex"] - ) - - if cand.get("target") == "SUBSCRIBER": - await pc_sub.addIceCandidate(ice) - elif cand.get("target") == "PUBLISHER": - await pc_pub.addIceCandidate(ice) - except: - pass - - except: - break - - @pc_sub.on("icecandidate") - async def on_sub_ice(event): - if event.candidate: - await ws.send(json.dumps({ - "uid": generate_uuid(), - "webrtcIceCandidate": { - "candidate": event.candidate.candidate, - "sdpMid": event.candidate.sdpMid, - "sdpMlineIndex": event.candidate.sdpMLineIndex, - "target": "SUBSCRIBER", - "pcSeq": 1 - } - })) - - @pc_pub.on("icecandidate") - async def on_pub_ice(event): - if event.candidate: - await ws.send(json.dumps({ - "uid": generate_uuid(), - "webrtcIceCandidate": { - "candidate": event.candidate.candidate, - "sdpMid": event.candidate.sdpMid, - "sdpMlineIndex": event.candidate.sdpMLineIndex, - "target": "PUBLISHER", - "pcSeq": 1 - } - })) - - ws_task = asyncio.create_task(ws_handler()) - - return { - "name": name, - "dc_pub": dc_pub, - "dc_pub_open": dc_pub_open, - "stats": stats, - "receiver": receiver, - "ws": ws, - "ws_task": ws_task, - "pc_sub": pc_sub, - "pc_pub": pc_pub - } - -async def stream_data(url, dc, stats): - try: - if not url.startswith(('http://', 'https://')): - url = 'https://' + url - - print(f" -> Streaming {url}...") - - stream_id = generate_uuid() - seq = 0 - - response = requests.get(url, stream=True, timeout=30) - response.raise_for_status() - - total_size = int(response.headers.get('content-length', 0)) - - if stats["start_time"] is None: - import time - stats["start_time"] = time.time() - - dc.send(f"START {stream_id} {total_size}") - - bytes_sent = 0 - last_progress = 0 - stall_count = 0 - - for chunk in response.iter_content(chunk_size=CHUNK_SIZE): - if not chunk: - break - - wait_count = 0 - while dc.bufferedAmount > BUFFER_THRESHOLD: - await asyncio.sleep(0.01) - wait_count += 1 - if wait_count > 500: - stall_count += 1 - if stall_count > 3: - print(f" X Buffer stalled at {dc.bufferedAmount} bytes") - raise Exception("Buffer overflow") - wait_count = 0 - - packet = stream_id.encode().ljust(36) + seq.to_bytes(4, 'big') + chunk - dc.send(packet) - - stats["sent"] += 1 - stats["bytes_sent"] += len(packet) - bytes_sent += len(chunk) - seq += 1 - - await asyncio.sleep(SEND_DELAY) - - if total_size > 0: - progress = (bytes_sent / total_size) * 100 - if progress - last_progress >= 5: - print(f" -> Sending: {bytes_sent / 1024 / 1024:.2f} MB / {total_size / 1024 / 1024:.2f} MB ({progress:.1f}%) [buffer: {dc.bufferedAmount}]") - last_progress = progress - - dc.send(f"END {stream_id}") - - import time - elapsed = time.time() - stats["start_time"] - mbps = (stats["bytes_sent"] * 8) / (elapsed * 1_000_000) if elapsed > 0 else 0 - - print(f" :P Streamed {stats['bytes_sent']} bytes in {elapsed:.2f}s ({mbps:.2f} Mbps)") - - except Exception as e: - print(f" X Error: {e}") - try: - dc.send(f"ERROR {str(e)}") - except: - pass - -async def run_stream(): - print(r""" - DCStream - High-Speed DataChannel - Optimized streaming over Yandex Telemost - by zarazaex for olc -""") - - print("[1/3] Creating server peer...") - try: - server = await create_peer("Server", is_server=True) - await asyncio.wait_for(server["dc_pub_open"].wait(), timeout=10.0) - print(" :P Server ready") - except Exception as e: - print(f" X Error: {e}") - return - - print("\n[2/3] Creating client peer...") - try: - client = await create_peer("Client", is_server=False) - await asyncio.wait_for(client["dc_pub_open"].wait(), timeout=10.0) - print(" :P Client ready") - except Exception as e: - print(f" X Error: {e}") - return - - print("\n[3/3] Starting stream...") - await asyncio.sleep(2) - - url = "https://raw.githubusercontent.com/openlibrecommunity/olcng/refs/heads/master/olcng.apk" - print(f" -> Client requesting: {url}") - - stream_id = None - total_size = 0 - last_received = 0 - - client["dc_pub"].send(f"STREAM {url}") - - print(" -> Receiving stream...") - - for i in range(300): - await asyncio.sleep(0.5) - - for cmd in client["stats"]["commands"]: - if cmd.startswith("START "): - parts = cmd.split() - stream_id = parts[1] - total_size = int(parts[2]) - print(f" -> Stream started, expecting {total_size} bytes ({total_size / 1024 / 1024:.2f} MB)") - - elif cmd.startswith("END "): - if stream_id: - final_data = client["receiver"].get_data(stream_id) - print(f"\n :P Received {len(final_data)} bytes ({len(final_data) / 1024 / 1024:.2f} MB)") - - if len(final_data) < 1024: - print("\n--- Stream Content ---") - try: - print(final_data.decode('utf-8')) - except: - print(f"[Binary data: {len(final_data)} bytes]") - print("--- End ---\n") - else: - print(f" -> Large file received, skipping content display") - - print("\nCleaning up...") - server["ws_task"].cancel() - client["ws_task"].cancel() - await server["ws"].close() - await client["ws"].close() - await server["pc_sub"].close() - await server["pc_pub"].close() - await client["pc_sub"].close() - await client["pc_pub"].close() - - print(":P Stream complete") - return - - elif cmd.startswith("ERROR"): - print(f"\n X {cmd}\n") - return - - client["stats"]["commands"].clear() - - if stream_id and total_size > 0: - if stream_id in client["receiver"].streams: - current_received = len(client["receiver"].streams[stream_id]) * CHUNK_SIZE - if current_received > last_received: - progress = (current_received / total_size) * 100 if total_size > 0 else 0 - print(f" -> Receiving: {current_received / 1024 / 1024:.2f} MB / {total_size / 1024 / 1024:.2f} MB ({progress:.1f}%) [packets: {client['stats']['received']}]") - last_received = current_received - - print("\n X Timeout waiting for stream") - -async def main(): - await run_stream() - -if __name__ == "__main__": - try: - asyncio.run(main()) - except KeyboardInterrupt: - print("\n\nStream interrupted.") diff --git a/code/flood.py b/code/flood.py deleted file mode 100755 index a240b35..0000000 --- a/code/flood.py +++ /dev/null @@ -1,144 +0,0 @@ -#!/usr/bin/env python3 - -import asyncio -import json -import uuid -import requests -from urllib.parse import quote -import websockets - -CONFERENCE_ID = "75047680642749" -CONFERENCE_URL = f"https://telemost.yandex.ru/j/{CONFERENCE_ID}" -API_BASE = "https://cloud-api.yandex.ru/telemost_front/v2/telemost" - -def generate_uuid(): - return str(uuid.uuid4()) - -def get_connection_info(display_name): - url = f"{API_BASE}/conferences/{quote(CONFERENCE_URL, safe='')}/connection" - params = { - "next_gen_media_platform_allowed": "true", - "display_name": display_name, - "waiting_room_supported": "true" - } - - headers = { - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0", - "Accept": "*/*", - "content-type": "application/json", - "Client-Instance-Id": generate_uuid(), - "X-Telemost-Client-Version": "187.1.0", - "idempotency-key": generate_uuid(), - "Origin": "https://telemost.yandex.ru", - "Referer": "https://telemost.yandex.ru/" - } - - response = requests.get(url, params=params, headers=headers) - response.raise_for_status() - return response.json() - -async def connect_peer(name, peer_num): - try: - conn_info = get_connection_info(name) - room_id = conn_info["room_id"] - peer_id = conn_info["peer_id"] - credentials = conn_info["credentials"] - ws_url = conn_info["client_configuration"]["media_server_url"] - - ws = await websockets.connect(ws_url) - - hello_msg = { - "uid": generate_uuid(), - "hello": { - "participantMeta": { - "name": name, - "role": "SPEAKER", - "description": "", - "sendAudio": False, - "sendVideo": False - }, - "participantAttributes": { - "name": name, - "role": "SPEAKER", - "description": "" - }, - "sendAudio": False, - "sendVideo": False, - "sendSharing": False, - "participantId": peer_id, - "roomId": room_id, - "serviceName": "telemost", - "credentials": credentials, - "capabilitiesOffer": { - "offerAnswerMode": ["SEPARATE"], - "initialSubscriberOffer": ["ON_HELLO"], - "slotsMode": ["FROM_CONTROLLER"], - "simulcastMode": ["DISABLED"], - "selfVadStatus": ["FROM_SERVER"] - }, - "sdkInfo": { - "implementation": "python", - "version": "1.0.0", - "userAgent": "SEXEVEN-Bot" - }, - "sdkInitializationId": generate_uuid(), - "disablePublisher": False, - "disableSubscriber": False - } - } - - await ws.send(json.dumps(hello_msg)) - - async def keep_alive(): - while True: - try: - data = json.loads(await ws.recv()) - if "serverHello" in data: - await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) - except: - break - - print(f"[:P] {name} connected (#{peer_num})") - - await keep_alive() - - except Exception as e: - print(f"[✗] {name} failed: {e}") - -async def main(): - print(r""" - - SEXEVEN FLOOD стойте пацаны это не флуд это просто мемчик - Connecting 40 peers to conference - -""") - - print(f"Target: {CONFERENCE_URL}\n") - print("Starting flood...\n") - - tasks = [] - - for i in range(1, 412): - suffix = "МЕМЫ" * i - name = f"СТОЙ ДАВАЙ ДРУЖИТЬ И ШУТИТЬ ПРОСТО Я ЖЕ ХОТЕЛ ПРОСТО А ТЫ ЗАЧЕМ ТО ГОВОРИШЬ СТОЙ ТЧО ЗАЧЕМ {suffix}" - - task = asyncio.create_task(connect_peer(name, i)) - tasks.append(task) - - await asyncio.sleep(0.5) - - print(f"\n[*] All 40 peers launched!") - print(f"[*] Keeping connections alive (Ctrl+C to stop)...\n") - - try: - await asyncio.gather(*tasks) - except KeyboardInterrupt: - print("\n[*] Stopping flood...") - for task in tasks: - task.cancel() - -if __name__ == "__main__": - try: - asyncio.run(main()) - except KeyboardInterrupt: - print("\nFlood stopped.") diff --git a/code/info.py b/code/info.py deleted file mode 100755 index a9813cd..0000000 --- a/code/info.py +++ /dev/null @@ -1,565 +0,0 @@ -#!/usr/bin/env python3 - -import asyncio -import json -import uuid -import requests -from urllib.parse import quote -import websockets -from datetime import datetime - -CONFERENCE_ID = "75047680642749" -CONFERENCE_URL = f"https://telemost.yandex.ru/j/{CONFERENCE_ID}" -API_BASE = "https://cloud-api.yandex.ru/telemost_front/v2/telemost" - -session = requests.Session() - -def log(msg, level="INFO"): - timestamp = datetime.now().strftime("%H:%M:%S.%f")[:-3] - log_msg = f"[{timestamp}] [{level}] {msg}" - print(log_msg) - -def generate_uuid(): - return str(uuid.uuid4()) - -def get_connection_info(display_name): - url = f"{API_BASE}/conferences/{quote(CONFERENCE_URL, safe='')}/connection" - params = { - "next_gen_media_platform_allowed": "true", - "display_name": display_name, - "waiting_room_supported": "true" - } - - headers = { - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0", - "Accept": "*/*", - "content-type": "application/json", - "Client-Instance-Id": generate_uuid(), - "X-Telemost-Client-Version": "187.1.0", - "idempotency-key": generate_uuid(), - "Origin": "https://telemost.yandex.ru", - "Referer": "https://telemost.yandex.ru/" - } - - log(f"GET {url}", "HTTP") - log(f"Params: {params}", "DEBUG") - - response = session.get(url, params=params, headers=headers) - response.raise_for_status() - - log(f"Response: {response.status_code}", "HTTP") - log(f"Cookies: {list(session.cookies.keys())}", "DEBUG") - - return response.json() - -def get_participants_list(peer_ids=None): - url = f"{API_BASE}/conferences/{quote(CONFERENCE_URL, safe='')}/request-states" - - headers = { - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0", - "Accept": "*/*", - "content-type": "application/json", - "Client-Instance-Id": generate_uuid(), - "idempotency-key": generate_uuid(), - "Origin": "https://telemost.yandex.ru", - "Referer": "https://telemost.yandex.ru/" - } - - if peer_ids: - payload = { - "peers": [{"peer_id": pid} for pid in peer_ids], - "permissions": {}, - "conference": {"version": -1} - } - else: - payload = { - "peers": [], - "permissions": {}, - "conference": {"version": -1} - } - - log(f"POST {url}", "HTTP") - log(f"Payload: {json.dumps(payload)[:100]}...", "DEBUG") - - response = session.post(url, json=payload, headers=headers) - response.raise_for_status() - - log(f"Response: {response.status_code}", "HTTP") - - result = response.json() - log(f"Response body: {json.dumps(result)[:500]}...", "DEBUG") - - return result - -async def collect_webrtc_info(): - log(r""" - - WebRTC Full Information Collector - Complete conference & peer analysis - by zowue for olc - -""", "INFO") - - info = { - "connection": {}, - "participants": {}, - "webrtc": { - "ice": {}, - "sdp": {}, - "datachannel": {}, - "audio": {}, - "video": {} - }, - "server": {} - } - - log("[1/5] Getting connection info...") - try: - conn_info = get_connection_info("InfoCollector") - - cookies_dict = session.cookies.get_dict() - if cookies_dict: - log(f" :P Cookies: {', '.join(cookies_dict.keys())}", "INFO") - - info["connection"] = { - "connection_type": conn_info.get("connection_type"), - "uri": conn_info.get("uri"), - "room_id": conn_info.get("room_id"), - "safe_room_id": conn_info.get("safe_room_id"), - "peer_id": conn_info.get("peer_id"), - "session_id": conn_info.get("session_id"), - "peer_session_id": conn_info.get("peer_session_id"), - "expiration_time": conn_info.get("expiration_time"), - "conference_limit": conn_info.get("conference_limit"), - "media_platform": conn_info.get("media_platform") - } - - client_config = conn_info.get("client_configuration", {}) - info["connection"]["client_config"] = { - "media_server_url": client_config.get("media_server_url"), - "service_name": client_config.get("service_name"), - "session_timeout_ms": client_config.get("goloom_session_open_ms"), - "reconnect_wait_ms": client_config.get("wait_time_to_reconnect_ms") - } - - log(f" :P Room: {info['connection']['room_id']}") - log(f" :P Peer: {info['connection']['peer_id']}") - log(f" :P Limit: {info['connection']['conference_limit']}") - except Exception as e: - log(f" X Error: {e}", "ERROR") - return info - - log("\n[2/5] Connecting to WebSocket...") - try: - ws_url = client_config.get("media_server_url") - log(f" -> WS URL: {ws_url}", "DEBUG") - ws = await websockets.connect(ws_url) - - hello_msg = { - "uid": generate_uuid(), - "hello": { - "participantMeta": {"name": "InfoCollector", "role": "SPEAKER", "sendAudio": False, "sendVideo": False}, - "participantAttributes": {"name": "InfoCollector", "role": "SPEAKER"}, - "sendAudio": False, - "sendVideo": False, - "sendSharing": False, - "participantId": conn_info["peer_id"], - "roomId": conn_info["room_id"], - "serviceName": "telemost", - "credentials": conn_info["credentials"], - "capabilitiesOffer": { - "offerAnswerMode": ["SEPARATE"], - "initialSubscriberOffer": ["ON_HELLO"], - "slotsMode": ["FROM_CONTROLLER"], - "simulcastMode": ["STATIC"], - "selfVadStatus": ["FROM_SERVER"], - "dataChannelSharing": ["TO_RTP"] - }, - "sdkInfo": {"implementation": "python", "version": "1.0.0", "userAgent": "InfoCollector"}, - "sdkInitializationId": generate_uuid(), - "disablePublisher": False, - "disableSubscriber": False - } - } - - await ws.send(json.dumps(hello_msg)) - log(" :P Connected") - log(f" -> Sent hello message", "DEBUG") - - log("\n[3/5] Collecting participants...") - info["participants"]["list"] = [] - collected_peer_ids = set() - - log("\n[4/5] Collecting WebRTC details...") - - for i in range(25): - try: - data = json.loads(await asyncio.wait_for(ws.recv(), timeout=1.0)) - - msg_type = next((k for k in data.keys() if k != "uid"), "unknown") - log(f" <- WS message #{i+1}: {msg_type}", "WS") - - if "updateDescription" in data: - update_desc = data["updateDescription"] - log(f" -> updateDescription: {json.dumps(update_desc)[:200]}...", "DEBUG") - - if "description" in update_desc: - for desc_item in update_desc["description"]: - peer_id = desc_item.get("id") - - if peer_id: - log(f" -> Found peer in updateDescription: {peer_id}", "DEBUG") - - if peer_id not in collected_peer_ids: - collected_peer_ids.add(peer_id) - - meta = desc_item.get("meta", {}) - peer_data = { - "peer_id": peer_id, - "display_name": meta.get("name"), - "role": meta.get("role"), - "send_audio": meta.get("sendAudio"), - "send_video": meta.get("sendVideo"), - "send_sharing": desc_item.get("sendSharing", False) - } - - info["participants"]["list"].append(peer_data) - audio_status = "ON" if peer_data['send_audio'] else "OFF" - video_status = "ON" if peer_data['send_video'] else "OFF" - log(f" :P {peer_data['display_name']} (A:{audio_status} V:{video_status})") - - await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) - - if "serverHello" in data: - server_hello = data["serverHello"] - - if "capabilitiesAnswer" in server_hello: - caps = server_hello["capabilitiesAnswer"] - info["webrtc"]["capabilities"] = { - "offerAnswerMode": caps.get("offerAnswerMode"), - "initialSubscriberOffer": caps.get("initialSubscriberOffer"), - "slotsMode": caps.get("slotsMode"), - "simulcastMode": caps.get("simulcastMode"), - "selfVadStatus": caps.get("selfVadStatus"), - "dataChannelSharing": caps.get("dataChannelSharing"), - "videoEncoderConfig": caps.get("videoEncoderConfig"), - "dataChannelVideoCodec": caps.get("dataChannelVideoCodec"), - "bandwidthLimitationReason": caps.get("bandwidthLimitationReason"), - "publisherVp9": caps.get("publisherVp9"), - "svcMode": caps.get("svcMode") - } - - if "servingComponents" in server_hello: - info["server"]["components"] = [] - for comp in server_hello["servingComponents"]: - info["server"]["components"].append({ - "type": comp.get("type"), - "host": comp.get("host"), - "version": comp.get("version") - }) - - if "rtcConfiguration" in server_hello: - rtc_config = server_hello["rtcConfiguration"] - info["webrtc"]["ice"]["servers"] = [] - - for server in rtc_config.get("iceServers", []): - info["webrtc"]["ice"]["servers"].append({ - "urls": server.get("urls"), - "has_credentials": bool(server.get("credential")) - }) - - if "pingPongConfiguration" in server_hello: - ping_config = server_hello["pingPongConfiguration"] - info["webrtc"]["ping"] = { - "interval_ms": ping_config.get("pingInterval"), - "ack_timeout_ms": ping_config.get("ackTimeout") - } - - if "telemetryConfiguration" in server_hello: - telem_config = server_hello["telemetryConfiguration"] - info["webrtc"]["telemetry"] = { - "sending_interval_ms": telem_config.get("sendingInterval") - } - - await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) - log(" :P Server hello") - - if "subscriberSdpOffer" in data: - sdp = data["subscriberSdpOffer"]["sdp"] - - info["webrtc"]["sdp"]["raw"] = sdp - info["webrtc"]["sdp"]["lines"] = sdp.count('\r\n') - - info["webrtc"]["audio"]["codecs"] = [] - info["webrtc"]["video"]["codecs"] = [] - info["webrtc"]["datachannel"]["supported"] = False - - for line in sdp.split('\r\n'): - if line.startswith('a=rtpmap:') and 'm=audio' in sdp[:sdp.find(line)]: - parts = line.split() - if len(parts) >= 2: - codec_info = parts[1].split('/') - info["webrtc"]["audio"]["codecs"].append({ - "name": codec_info[0], - "rate": int(codec_info[1]) if len(codec_info) > 1 else None, - "channels": int(codec_info[2]) if len(codec_info) > 2 else None - }) - - if line.startswith('a=rtpmap:') and 'm=video' in sdp[:sdp.find(line)]: - parts = line.split() - if len(parts) >= 2: - codec_info = parts[1].split('/') - codec_name = codec_info[0].upper() - if codec_name not in ['RTX', 'RED', 'ULPFEC']: - if codec_name not in [c["name"] for c in info["webrtc"]["video"]["codecs"]]: - info["webrtc"]["video"]["codecs"].append({ - "name": codec_name, - "rate": int(codec_info[1]) if len(codec_info) > 1 else None - }) - - if 'm=application' in line and 'SCTP' in line: - info["webrtc"]["datachannel"]["supported"] = True - - if 'sctp-port:' in line: - info["webrtc"]["datachannel"]["sctp_port"] = int(line.split(':')[1].strip()) - - if 'max-message-size:' in line: - size = int(line.split(':')[1].strip()) - info["webrtc"]["datachannel"]["max_message_size"] = size - info["webrtc"]["datachannel"]["max_message_size_mb"] = size / 1024 / 1024 - - if line.startswith('a=fmtp:') and 'opus' in sdp[max(0, sdp.find(line)-200):sdp.find(line)].lower(): - params = line.split(':', 1)[1].strip().split(';') - info["webrtc"]["audio"]["opus_params"] = {} - for param in params: - if '=' in param: - key, val = param.strip().split('=') - info["webrtc"]["audio"]["opus_params"][key] = val - - if line.startswith('a=extmap:'): - if "rtp_extensions" not in info["webrtc"]: - info["webrtc"]["rtp_extensions"] = [] - ext_info = line.split()[1] - info["webrtc"]["rtp_extensions"].append(ext_info) - - await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) - log(" :P SDP analyzed") - - except asyncio.TimeoutError: - log(f" -> Timeout on message #{i+1}, continuing...", "DEBUG") - break - except Exception as e: - log(f" X Error processing message: {e}", "ERROR") - break - - info["participants"]["count"] = len(info["participants"]["list"]) - - await ws.close() - log(" :P Closed") - - log("\n -> Fetching all participants via API...") - try: - all_participants = get_participants_list() - - log(f" -> API returned: peers={len(all_participants.get('peers', []))}, permissions={bool(all_participants.get('permissions'))}", "DEBUG") - - if "peers" in all_participants and all_participants["peers"]: - log(f" :P Found {len(all_participants['peers'])} participants via API") - - for peer in all_participants["peers"]: - peer_id = peer.get("peer_id") - if peer_id and peer_id not in collected_peer_ids: - peer_data = { - "peer_id": peer_id, - "peer_type": peer.get("peer_type"), - "send_audio": None, - "send_video": None, - "send_sharing": None - } - - if "state" in peer and "user_data" in peer["state"]: - user_data = peer["state"]["user_data"] - peer_data["display_name"] = user_data.get("display_name") - peer_data["role"] = user_data.get("role") - peer_data["uid"] = user_data.get("uid") - if "avatar_placeholder" in user_data: - peer_data["avatar"] = user_data["avatar_placeholder"] - - info["participants"]["list"].append(peer_data) - collected_peer_ids.add(peer_id) - log(f" :P {peer_data.get('display_name', 'Unknown')} ({peer_data.get('role', 'N/A')})") - - info["participants"]["count"] = len(info["participants"]["list"]) - else: - log(f" X No peers in API response", "WARN") - - if "permissions" in all_participants: - info["conference"] = {"permissions": all_participants["permissions"]} - if "conference" in all_participants: - info["conference"]["state"] = all_participants["conference"].get("state") - - except Exception as e: - log(f" X API request failed: {e}", "ERROR") - - log(f"\n :P Total participants: {info['participants']['count']}") - - except Exception as e: - log(f" X Error: {e}", "ERROR") - - return info - -def print_full_report(info): - print("CONNECTION INFO") - conn = info["connection"] - print(f"Type: {conn.get('connection_type')}") - print(f"URI: {conn.get('uri')}") - print(f"Room ID: {conn.get('room_id')}") - print(f"Peer ID: {conn.get('peer_id')}") - print(f"Session ID: {conn.get('session_id')}") - print(f"Media Platform: {conn.get('media_platform')}") - print(f"Max Participants: {conn.get('conference_limit')}") - - if "client_config" in conn: - cfg = conn["client_config"] - print(f"\nClient Configuration:") - print(f" Media Server: {cfg.get('media_server_url')}") - print(f" Session Timeout: {cfg.get('session_timeout_ms')}ms ({cfg.get('session_timeout_ms', 0)/1000/60:.1f}min)") - print(f" Reconnect Wait: {cfg.get('reconnect_wait_ms')}ms") - - print("PARTICIPANTS") - if "list" in info["participants"] and info["participants"]["list"]: - for i, peer in enumerate(info["participants"]["list"], 1): - print(f"\n{i}. {peer.get('display_name', 'Unknown')}") - print(f" Peer ID: {peer.get('peer_id')}") - print(f" Role: {peer.get('role')}") - if peer.get('uid'): - print(f" UID: {peer.get('uid')}") - print(f" Audio: {peer.get('send_audio')}") - print(f" Video: {peer.get('send_video')}") - print(f" Sharing: {peer.get('send_sharing')}") - if "avatar" in peer: - avatar = peer["avatar"] - print(f" Avatar: {avatar.get('abbreviation')} ({avatar.get('background_color')})") - print(f"\nTotal: {info['participants'].get('count', 0)} participants") - else: - print("No participants in conference") - - if "conference" in info and "permissions" in info["conference"]: - print("\n" + "=" * 70) - print("CONFERENCE PERMISSIONS") - print("=" * 70) - perms = info["conference"]["permissions"] - print(f"Version: {perms.get('version')}") - - if "public_role_permissions" in perms: - print("\nRole Permissions:") - for role_perm in perms["public_role_permissions"]: - role = role_perm.get("role") - allowed = role_perm.get("allowed", []) - print(f" {role}: {', '.join(allowed)}") - - if "personal_allowed" in perms: - print(f"\nPersonal: {', '.join(perms['personal_allowed'])}") - - if "state" in info["conference"]: - state = info["conference"]["state"] - print("\nConference State:") - print(f" Access Level: {state.get('access_level')}") - print(f" Recording: {state.get('local_recording_allowed')}") - print(f" Cloud Recording: {state.get('cloud_recording_allowed')}") - print(f" Chat: {state.get('chat_allowed')}") - print(f" Control: {state.get('control_allowed')}") - print(f" Broadcast: {state.get('broadcast_allowed')}") - - print("WEBRTC CAPABILITIES") - if "capabilities" in info["webrtc"]: - caps = info["webrtc"]["capabilities"] - print(f"Offer/Answer Mode: {caps.get('offerAnswerMode')}") - print(f"Initial Subscriber Offer: {caps.get('initialSubscriberOffer')}") - print(f"Slots Mode: {caps.get('slotsMode')}") - print(f"Simulcast Mode: {caps.get('simulcastMode')}") - print(f"VAD Status: {caps.get('selfVadStatus')}") - print(f"DataChannel Sharing: {caps.get('dataChannelSharing')}") - print(f"Video Encoder Config: {caps.get('videoEncoderConfig')}") - print(f"DC Video Codec: {caps.get('dataChannelVideoCodec')}") - print(f"Bandwidth Limitation: {caps.get('bandwidthLimitationReason')}") - print(f"Publisher VP9: {caps.get('publisherVp9')}") - print(f"SVC Mode: {caps.get('svcMode')}") - - print("AUDIO") - if "codecs" in info["webrtc"]["audio"]: - print("Codecs:") - for codec in info["webrtc"]["audio"]["codecs"]: - print(f" - {codec['name']}: {codec.get('rate', 'N/A')}Hz, {codec.get('channels', 'N/A')} channels") - - if "opus_params" in info["webrtc"]["audio"]: - print("\nOpus Parameters:") - for key, val in info["webrtc"]["audio"]["opus_params"].items(): - print(f" {key}: {val}") - - print("VIDEO") - if "codecs" in info["webrtc"]["video"]: - print("Codecs:") - for codec in info["webrtc"]["video"]["codecs"]: - print(f" - {codec['name']}: {codec.get('rate', 'N/A')}Hz") - - print("DATACHANNEL") - dc = info["webrtc"]["datachannel"] - print(f"Supported: {':P YES' if dc.get('supported') else 'X NO'}") - if dc.get("supported"): - print(f"SCTP Port: {dc.get('sctp_port')}") - print(f"Max Message Size: {dc.get('max_message_size_mb', 0):.0f}MB ({dc.get('max_message_size', 0):,} bytes)") - print(f"Note: Actual limit is 8KB due to server fragmentation") - - print("ICE/NETWORK") - if "servers" in info["webrtc"]["ice"]: - stun_count = sum(1 for s in info["webrtc"]["ice"]["servers"] if any('stun:' in u for u in s.get("urls", []))) - turn_count = sum(1 for s in info["webrtc"]["ice"]["servers"] if any('turn:' in u for u in s.get("urls", []))) - - print(f"STUN Servers: {stun_count}") - print(f"TURN Servers: {turn_count}") - print("\nServers:") - for server in info["webrtc"]["ice"]["servers"]: - for url in server.get("urls", []): - cred_status = "with credentials" if server.get("has_credentials") else "no credentials" - print(f" - {url} ({cred_status})") - - if "ping" in info["webrtc"]: - ping = info["webrtc"]["ping"] - print(f"\nPing Configuration:") - print(f" Interval: {ping.get('interval_ms')}ms") - print(f" ACK Timeout: {ping.get('ack_timeout_ms')}ms") - - if "rtp_extensions" in info["webrtc"]: - print(f"\nRTP Extensions: {len(info['webrtc']['rtp_extensions'])}") - for ext in info["webrtc"]["rtp_extensions"][:5]: - print(f" - {ext}") - - print("SERVER COMPONENTS") - if "components" in info["server"]: - for comp in info["server"]["components"]: - print(f"{comp.get('type'):20} {comp.get('host'):30} v{comp.get('version')}") - - if "telemetry" in info["webrtc"]: - telem = info["webrtc"]["telemetry"] - print(f"\nTelemetry:") - print(f" Sending Interval: {telem.get('sending_interval_ms')}ms") - - print("SDP STATISTICS") - if "lines" in info["webrtc"]["sdp"]: - print(f"Total SDP Lines: {info['webrtc']['sdp']['lines']}") - print(f"SDP Size: {len(info['webrtc']['sdp'].get('raw', ''))} bytes") - -async def main(): - log(f"Starting WebRTC info collection...") - - info = await collect_webrtc_info() - - log("\n[5/5] Generating report...\n") - print_full_report(info) - -if __name__ == "__main__": - try: - asyncio.run(main()) - except KeyboardInterrupt: - log("\n\nInfo collection interrupted.", "WARN") diff --git a/code/invicible.py b/code/invicible.py deleted file mode 100755 index 4e04c0e..0000000 --- a/code/invicible.py +++ /dev/null @@ -1,592 +0,0 @@ -#!/usr/bin/env python3 - -import asyncio -import json -import uuid -import websockets -import requests -import qrcode -import cv2 -import numpy as np -import base64 -import os -import time -from urllib.parse import quote -from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 -from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceCandidate, RTCConfiguration, RTCIceServer -from aiortc.mediastreams import MediaStreamTrack -from av import VideoFrame -from PIL import Image -from pyzbar import pyzbar -from fractions import Fraction - -CONFERENCE_ID = "75047680642749" -CONFERENCE_URL = f"https://telemost.yandex.ru/j/{CONFERENCE_ID}" -API_BASE = "https://cloud-api.yandex.ru/telemost_front/v2/telemost" - -QR_SIZE = 600 -CHUNK_SIZE = 400 -FRAME_RATE = 1 -SHARED_KEY = os.urandom(32) - -def gen_uid(): - return str(uuid.uuid4()) - -def get_connection_info(display_name): - url = f"{API_BASE}/conferences/{quote(CONFERENCE_URL, safe='')}/connection" - params = { - "next_gen_media_platform_allowed": "true", - "display_name": display_name, - "waiting_room_supported": "true" - } - headers = { - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0", - "Accept": "*/*", - "content-type": "application/json", - "Client-Instance-Id": gen_uid(), - "X-Telemost-Client-Version": "187.1.0", - "idempotency-key": gen_uid(), - "Origin": "https://telemost.yandex.ru", - "Referer": "https://telemost.yandex.ru/" - } - r = requests.get(url, params=params, headers=headers) - r.raise_for_status() - return r.json() - -def encrypt_payload(tag_str, data_bytes): - nonce = os.urandom(12) - chacha = ChaCha20Poly1305(SHARED_KEY) - ciphertext = chacha.encrypt(nonce, data_bytes, None) - blob = nonce + ciphertext - tag_bytes = tag_str.encode('ascii').ljust(4, b'\x00')[:4] - len_bytes = len(blob).to_bytes(4, 'big') - return tag_bytes + len_bytes + blob - -def decrypt_payload(envelope): - tag = envelope[:4].decode('ascii').strip('\x00') - length = int.from_bytes(envelope[4:8], 'big') - blob = envelope[8:8+length] - nonce = blob[:12] - ciphertext = blob[12:] - chacha = ChaCha20Poly1305(SHARED_KEY) - data = chacha.decrypt(nonce, ciphertext, None) - return tag, data - -def make_qr_frame(data, pts): - qr = qrcode.QRCode( - version=None, - error_correction=qrcode.constants.ERROR_CORRECT_M, - box_size=12, - border=4 - ) - qr.add_data(data) - qr.make(fit=True) - img = qr.make_image(fill_color="black", back_color="white").resize( - (QR_SIZE, QR_SIZE), Image.NEAREST - ) - arr = np.array(img.convert('RGB')) - frame = VideoFrame.from_ndarray(arr, format="rgb24") - frame.pts = pts - frame.time_base = Fraction(1, FRAME_RATE) - return frame - -def chunk_data(data, tid): - b64 = base64.b64encode(data).decode() - n = (len(b64) + CHUNK_SIZE - 1) // CHUNK_SIZE - return [json.dumps({"tid": tid, "idx": i, "total": n, - "data": b64[i * CHUNK_SIZE:(i + 1) * CHUNK_SIZE]}) - for i in range(n)] - -class QRVideoTrack(MediaStreamTrack): - kind = "video" - - def __init__(self): - super().__init__() - self._frames = [] - self._idx = 0 - self._pts = 0 - - def set_data(self, chunks): - self._frames = [make_qr_frame(c, i) for i, c in enumerate(chunks)] - self._idx = 0 - self._pts = 0 - - async def recv(self): - await asyncio.sleep(1.0 / FRAME_RATE) - if not self._frames: - f = make_qr_frame("WAIT", self._pts) - self._pts += 1 - return f - f = self._frames[self._idx] - f.pts = self._pts - f.time_base = Fraction(1, FRAME_RATE) - self._pts += 1 - self._idx = (self._idx + 1) % len(self._frames) - return f - -class DualReceiver: - def __init__(self): - self._bufs = {} - self.vc_result = None - self.dc_result = None - self._cv2_detector = cv2.QRCodeDetector() - - def feed_frame(self, frame): - if self.vc_result is not None: - return False - - try: - arr = frame.to_ndarray(format="rgb24") - h, w = arr.shape[:2] - gray = cv2.cvtColor(arr, cv2.COLOR_RGB2GRAY) - - variants = [ - gray, - cv2.resize(gray, (w * 2, h * 2), interpolation=cv2.INTER_CUBIC), - cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU)[1], - ] - - decoded = set() - for v in variants: - try: - for code in pyzbar.decode(v): - decoded.add(code.data.decode('utf-8')) - except Exception: - pass - try: - val, _, _ = self._cv2_detector.detectAndDecode(v) - if val: - decoded.add(val) - except Exception: - pass - - for raw in decoded: - try: - pkt = json.loads(raw) - tid = pkt["tid"] - idx = pkt["idx"] - total = pkt["total"] - data = pkt["data"] - if tid not in self._bufs: - self._bufs[tid] = {} - if idx not in self._bufs[tid]: - self._bufs[tid][idx] = data - if len(self._bufs[tid]) == total: - b64 = "".join(self._bufs[tid][i] for i in range(total)) - self.vc_result = base64.b64decode(b64) - return True - except Exception: - pass - except Exception: - pass - return False - -async def process_track(track, receiver): - while True: - try: - frame = await asyncio.wait_for(track.recv(), timeout=30.0) - if receiver.feed_frame(frame): - return - except Exception: - return - -def make_ice_servers(raw_list): - result = [] - for s in raw_list: - urls = s.get("urls", []) - cred = s.get("credential", "") - user = s.get("username", "") - if cred: - result.append(RTCIceServer(urls=urls, credential=cred, username=user)) - else: - result.append(RTCIceServer(urls=urls)) - return result or [RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"])] - -async def connect_peer(name, conn): - room_id = conn["room_id"] - peer_id = conn["peer_id"] - credentials = conn["credentials"] - ws_url = conn["client_configuration"]["media_server_url"] - is_sender = "Sender" in name - - default_ice = [RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"])] - - video_track = QRVideoTrack() if is_sender else None - receiver_obj = DualReceiver() if not is_sender else None - track_tasks = [] - - pc_sub_ref = [RTCPeerConnection(RTCConfiguration(iceServers=default_ice))] - pc_pub_ref = [RTCPeerConnection(RTCConfiguration(iceServers=default_ice))] - - dc_pub_ref = [] - dc_open_event = asyncio.Event() - - if is_sender: - pc_pub_ref[0].addTrack(video_track) - dc = pc_pub_ref[0].createDataChannel("invisible", ordered=True) - dc_pub_ref.append(dc) - - @dc.on("open") - def on_open(): - dc_open_event.set() - - ws = await websockets.connect( - ws_url, - additional_headers={ - "Origin": "https://telemost.yandex.ru", - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0" - } - ) - - async def send(obj): - await ws.send(json.dumps(obj)) - - async def ack(uid): - await send({"uid": uid, "ack": {"status": {"code": "OK", "description": ""}}}) - - def setup_pc(pc_sub, pc_pub): - if not is_sender: - @pc_sub.on("datachannel") - def on_datachannel(channel): - @channel.on("message") - def on_message(message): - if receiver_obj is not None: - receiver_obj.dc_result = message - - @pc_sub.on("track") - def on_track(track): - if track.kind == "video" and receiver_obj is not None: - t = asyncio.ensure_future(process_track(track, receiver_obj)) - track_tasks.append(t) - - @pc_sub.on("icecandidate") - async def on_sub_ice(e): - if e.candidate: - await send({ - "uid": gen_uid(), - "webrtcIceCandidate": { - "candidate": e.candidate.candidate, - "sdpMid": e.candidate.sdpMid, - "sdpMlineIndex": e.candidate.sdpMLineIndex, - "usernameFragment": "", - "target": "SUBSCRIBER", - "pcSeq": 1 - } - }) - - @pc_pub.on("icecandidate") - async def on_pub_ice(e): - if e.candidate: - await send({ - "uid": gen_uid(), - "webrtcIceCandidate": { - "candidate": e.candidate.candidate, - "sdpMid": e.candidate.sdpMid, - "sdpMlineIndex": e.candidate.sdpMLineIndex, - "usernameFragment": "", - "target": "PUBLISHER", - "pcSeq": 1 - } - }) - - setup_pc(pc_sub_ref[0], pc_pub_ref[0]) - - hello = { - "uid": gen_uid(), - "hello": { - "participantMeta": { - "name": name, "role": "SPEAKER", "description": "", - "sendAudio": False, "sendVideo": is_sender - }, - "participantAttributes": {"name": name, "role": "SPEAKER", "description": ""}, - "sendAudio": False, - "sendVideo": is_sender, - "sendSharing": False, - "participantId": peer_id, - "roomId": room_id, - "serviceName": "telemost", - "credentials": credentials, - "capabilitiesOffer": { - "offerAnswerMode": ["SEPARATE"], - "initialSubscriberOffer": ["ON_HELLO"], - "slotsMode": ["FROM_CONTROLLER"], - "simulcastMode": ["DISABLED", "STATIC"], - "selfVadStatus": ["FROM_SERVER", "FROM_CLIENT"], - "dataChannelSharing": ["TO_RTP"], - "videoEncoderConfig": ["NO_CONFIG", "ONLY_INIT_CONFIG", "RUNTIME_CONFIG"], - "dataChannelVideoCodec": ["VP8", "UNIQUE_CODEC_FROM_TRACK_DESCRIPTION"] - }, - "sdkInfo": { - "implementation": "browser", - "version": "5.27.0", - "userAgent": "Mozilla/5.0", - "hwConcurrency": 24 - }, - "sdkInitializationId": gen_uid(), - "disablePublisher": not is_sender, - "disableSubscriber": False, - "disableSubscriberAudio": True - } - } - - await send(hello) - pub_sdp_sent = False - - async def ws_loop(): - nonlocal pub_sdp_sent - try: - async for raw in ws: - msg = json.loads(raw) - keys = [k for k in msg if k != "uid"] - if not keys: - continue - mtype = keys[0] - uid = msg.get("uid", "") - - if mtype == "ack": - pass - - elif mtype == "serverHello": - sh = msg["serverHello"] - raw_ice = sh.get("rtcConfiguration", {}).get("iceServers", []) - if raw_ice: - ice = make_ice_servers(raw_ice) - old_sub = pc_sub_ref[0] - old_pub = pc_pub_ref[0] - pc_sub_ref[0] = RTCPeerConnection(RTCConfiguration(iceServers=ice)) - pc_pub_ref[0] = RTCPeerConnection(RTCConfiguration(iceServers=ice)) - - if is_sender: - pc_pub_ref[0].addTrack(video_track) - dc = pc_pub_ref[0].createDataChannel("invisible", ordered=True) - dc_pub_ref.clear() - dc_pub_ref.append(dc) - @dc.on("open") - def on_open(): - dc_open_event.set() - - setup_pc(pc_sub_ref[0], pc_pub_ref[0]) - await old_sub.close() - await old_pub.close() - await ack(uid) - - elif mtype == "subscriberSdpOffer": - offer_sdp = msg["subscriberSdpOffer"]["sdp"] - pc_seq = msg["subscriberSdpOffer"]["pcSeq"] - pc_sub = pc_sub_ref[0] - pc_pub = pc_pub_ref[0] - - await pc_sub.setRemoteDescription( - RTCSessionDescription(sdp=offer_sdp, type="offer") - ) - answer = await pc_sub.createAnswer() - await pc_sub.setLocalDescription(answer) - - await send({ - "uid": gen_uid(), - "subscriberSdpAnswer": { - "pcSeq": pc_seq, - "sdp": pc_sub.localDescription.sdp - } - }) - await ack(uid) - - if not is_sender: - await send({ - "uid": gen_uid(), - "setSlots": { - "slots": [{"width": 1280, "height": 720}], - "audioSlotsCount": 0, - "key": 1, - "shutdownAllVideo": None, - "withSelfView": False, - "selfViewVisibility": "ON_LOADING_THEN_SHOW", - "gridConfig": {} - } - }) - - if is_sender and not pub_sdp_sent: - await asyncio.sleep(0.3) - pub_offer = await pc_pub.createOffer() - await pc_pub.setLocalDescription(pub_offer) - tracks_info = [] - for t in pc_pub.getTransceivers(): - if t.sender.track: - tracks_info.append({ - "mid": t.mid, - "transceiverMid": t.mid, - "kind": t.sender.track.kind.upper(), - "priority": 0, - "label": "QRVideoTrack", - "codecs": {}, - "groupId": 1, - "description": "" - }) - await send({ - "uid": gen_uid(), - "publisherSdpOffer": { - "pcSeq": 1, - "sdp": pc_pub.localDescription.sdp, - "tracks": tracks_info - } - }) - pub_sdp_sent = True - - elif mtype == "publisherSdpAnswer": - await pc_pub_ref[0].setRemoteDescription( - RTCSessionDescription(sdp=msg["publisherSdpAnswer"]["sdp"], type="answer") - ) - await ack(uid) - - elif mtype == "webrtcIceCandidate": - cand = msg["webrtcIceCandidate"] - parts = cand.get("candidate", "").split() - if len(parts) >= 8: - try: - ice_c = RTCIceCandidate( - component=int(parts[1]), - foundation=parts[0].replace("candidate:", ""), - ip=parts[4], - port=int(parts[5]), - priority=int(parts[3]), - protocol=parts[2].lower(), - type=parts[7], - sdpMid=cand.get("sdpMid", "0"), - sdpMLineIndex=cand.get("sdpMlineIndex", 0) - ) - if cand.get("target") == "SUBSCRIBER": - await pc_sub_ref[0].addIceCandidate(ice_c) - elif cand.get("target") == "PUBLISHER": - await pc_pub_ref[0].addIceCandidate(ice_c) - except Exception: - pass - - elif mtype in ("setSlots", "slotsConfig", "slotsMeta", "vadActivity", - "updateDescription", "upsertDescription", "sdkCodecsInfo", - "pingPong", "selfQualityReport", "upsertParticipantsQualityReport", - "removeDescription"): - await ack(uid) - else: - if uid: - await ack(uid) - except websockets.exceptions.ConnectionClosed: - pass - except Exception: - pass - - ws_task = asyncio.create_task(ws_loop()) - - return { - "name": name, - "ws": ws, - "ws_task": ws_task, - "pc_pub_ref": pc_pub_ref, - "pc_sub_ref": pc_sub_ref, - "video_track": video_track, - "receiver": receiver_obj, - "track_tasks": track_tasks, - "dc_pub_ref": dc_pub_ref, - "dc_open_event": dc_open_event - } - -async def run(): - print("ChaCha20-Poly1305 over Telemost DC + VC") - print("text + video encrypted transfer") - print(" by zarazaex for olc\n") - - sender_conn = get_connection_info("QR_Sender") - receiver_conn = get_connection_info("QR_Receiver") - - print("[1/4] Generating payloads...") - text_data = "do you want to pet my pu... cat".encode('utf-8') - video_data = os.urandom(2048) - - print(f"-> Text payload: {len(text_data)} bytes") - print(f"-> Video payload: {len(video_data)} bytes\n") - - print("[2/4] Creating sender peer...") - sender = await connect_peer("QR_Sender", sender_conn) - await sender["dc_open_event"].wait() - print(":P Sender ready\n") - - print("[3/4] Creating receiver peer...") - receiver = await connect_peer("QR_Receiver", receiver_conn) - await asyncio.sleep(5) - print(":P Receiver ready\n") - - print("[4/4] Encrypting and sending...\n") - - enc_text = encrypt_payload("TEXT", text_data) - print(f"[TEXT] Original data ({len(text_data)} bytes):") - print(f" UTF-8: {text_data.decode('utf-8')}") - print(f" HEX: {text_data.hex()}") - print(f"[TEXT] Encrypted envelope ({len(enc_text)} bytes):") - print(f" Tag: {enc_text[:4].hex().upper()}") - print(f" Len: {int.from_bytes(enc_text[4:8], 'big')}") - print(f" Blob: {enc_text[8:72].hex()}...\n") - - print("-> Sending TEXT...") - sender["dc_pub_ref"][0].send(enc_text) - print(":P Text sent\n") - - enc_video = encrypt_payload("VID\x00", video_data) - print(f"[VIDEO] Original data ({len(video_data)} bytes):") - print(f" HEX: {video_data[:64].hex()}...") - print(f"[VIDEO] Encrypted envelope ({len(enc_video)} bytes):") - print(f" Tag: {enc_video[:4].hex().upper()}") - print(f" Len: {int.from_bytes(enc_video[4:8], 'big')}") - print(f" Blob: {enc_video[8:72].hex()}...\n") - - print("-> Sending VIDEO...") - tid = gen_uid() - chunks = chunk_data(enc_video, tid) - sender["video_track"].set_data(chunks) - print(":P Video sent\n") - - print("-> Waiting for receiver...") - - for i in range(120): - await asyncio.sleep(1) - if receiver["receiver"].dc_result is not None and receiver["receiver"].vc_result is not None: - break - - dc_res = receiver["receiver"].dc_result - vc_res = receiver["receiver"].vc_result - - if dc_res: - print(f"[Receiver] <- received 'TEXT': {len(dc_res)} bytes") - if vc_res: - print(f"[Receiver] <- received 'VID': {len(vc_res)} bytes") - - print("\n--- Received & Decrypted ---") - - if dc_res: - tag, dec_text = decrypt_payload(dc_res) - print(f"[TEXT] Decrypted ({len(dec_text)} bytes):") - print(f"UTF-8: {dec_text.decode('utf-8')}") - print(f"HEX: {dec_text.hex()}") - - if vc_res: - tag, dec_vid = decrypt_payload(vc_res) - print(f"[VIDEO] Decrypted ({len(dec_vid)} bytes):") - print(f"HEX: {dec_vid[:64].hex()}...") - - print("\nCleaning up...") - for p in [sender, receiver]: - p["ws_task"].cancel() - try: - await p["ws"].close() - except Exception: - pass - try: - await p["pc_pub_ref"][0].close() - await p["pc_sub_ref"][0].close() - except Exception: - pass - print(":P Done") - -if __name__ == "__main__": - try: - asyncio.run(run()) - except KeyboardInterrupt: - pass diff --git a/code/limits.py b/code/limits.py deleted file mode 100755 index 501aa81..0000000 --- a/code/limits.py +++ /dev/null @@ -1,1054 +0,0 @@ -#!/usr/bin/env python3 - -import asyncio -import json -import uuid -import websockets -import requests -from urllib.parse import quote -from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceCandidate, RTCConfiguration, RTCIceServer - -CHUNK_SIZE = 7168 -HEADER_SIZE = 1024 - -class ChunkedReceiver: - def __init__(self): - self.buffers = {} - self.completed = {} - - def handle_chunk(self, packet): - if len(packet) < HEADER_SIZE: - return None - - header_bytes = packet[:HEADER_SIZE].rstrip(b'\x00') - chunk_data = packet[HEADER_SIZE:] - - try: - header = json.loads(header_bytes) - except: - return None - - tid = header["tid"] - idx = header["idx"] - total = header["total"] - - if tid not in self.buffers: - self.buffers[tid] = {"chunks": {}, "total": total, "received": 0} - - buf = self.buffers[tid] - - if idx not in buf["chunks"]: - buf["chunks"][idx] = chunk_data - buf["received"] += 1 - - if buf["received"] == buf["total"]: - complete = b"".join(buf["chunks"][i] for i in range(buf["total"])) - self.completed[tid] = complete - del self.buffers[tid] - return tid - - return None - -def chunk_data(data, transfer_id): - total_size = len(data) - chunk_count = (total_size + CHUNK_SIZE - 1) // CHUNK_SIZE - packets = [] - - for i in range(chunk_count): - start = i * CHUNK_SIZE - end = min(start + CHUNK_SIZE, total_size) - chunk = data[start:end] - - header = {"tid": transfer_id, "idx": i, "total": chunk_count, "size": total_size} - header_json = json.dumps(header).encode() - header_padded = header_json.ljust(HEADER_SIZE, b'\x00') - - packets.append(header_padded + chunk) - - return packets - -CONFERENCE_ID = "75047680642749" -CONFERENCE_URL = f"https://telemost.yandex.ru/j/{CONFERENCE_ID}" -API_BASE = "https://cloud-api.yandex.ru/telemost_front/v2/telemost" - -def generate_uuid(): - return str(uuid.uuid4()) - -def get_connection_info(display_name): - url = f"{API_BASE}/conferences/{quote(CONFERENCE_URL, safe='')}/connection" - params = { - "next_gen_media_platform_allowed": "true", - "display_name": display_name, - "waiting_room_supported": "true" - } - - headers = { - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0", - "Accept": "*/*", - "content-type": "application/json", - "Client-Instance-Id": generate_uuid(), - "X-Telemost-Client-Version": "187.1.0", - "idempotency-key": generate_uuid(), - "Origin": "https://telemost.yandex.ru", - "Referer": "https://telemost.yandex.ru/" - } - - response = requests.get(url, params=params, headers=headers) - response.raise_for_status() - return response.json() - -async def check_datachannel_limits(): - limits = { - "supported": False, - "sctp_port": None, - "max_message_size": None, - "max_message_size_mb": None, - "ordered_supported": None, - "protocol": None - } - - try: - conn_info = get_connection_info("LimitCheck") - ws_url = conn_info["client_configuration"]["media_server_url"] - - async with websockets.connect(ws_url) as ws: - hello = { - "uid": generate_uuid(), - "hello": { - "participantMeta": {"name": "LimitCheck", "role": "SPEAKER", "sendAudio": False, "sendVideo": False}, - "participantAttributes": {"name": "LimitCheck", "role": "SPEAKER"}, - "sendAudio": False, "sendVideo": False, "sendSharing": False, - "participantId": conn_info["peer_id"], - "roomId": conn_info["room_id"], - "serviceName": "telemost", - "credentials": conn_info["credentials"], - "capabilitiesOffer": {"offerAnswerMode": ["SEPARATE"], "initialSubscriberOffer": ["ON_HELLO"]}, - "sdkInfo": {"implementation": "python", "version": "1.0.0"}, - "sdkInitializationId": generate_uuid(), - "disablePublisher": False, "disableSubscriber": False - } - } - await ws.send(json.dumps(hello)) - - for _ in range(10): - data = json.loads(await ws.recv()) - if "serverHello" in data: - await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) - if "subscriberSdpOffer" in data: - sdp = data["subscriberSdpOffer"]["sdp"] - if "m=application" in sdp and "SCTP" in sdp: - limits["supported"] = True - limits["protocol"] = "SCTP over DTLS" - for line in sdp.split('\r\n'): - if 'sctp-port:' in line: - limits["sctp_port"] = int(line.split(':')[1].strip()) - if 'max-message-size:' in line: - size = int(line.split(':')[1].strip()) - limits["max_message_size"] = size - limits["max_message_size_mb"] = size / 1024 / 1024 - limits["ordered_supported"] = True - break - except Exception as e: - limits["error"] = str(e) - - return limits - -async def check_conference_limits(): - limits = { - "max_participants": None, - "session_timeout_ms": None, - "session_timeout_min": None, - "reconnect_wait_ms": None, - "media_platform": None, - "connection_type": None - } - - try: - conn_info = get_connection_info("ConfCheck") - limits["max_participants"] = conn_info.get("conference_limit") - limits["connection_type"] = conn_info.get("connection_type") - limits["media_platform"] = conn_info.get("media_platform") - - client_config = conn_info.get("client_configuration", {}) - limits["session_timeout_ms"] = client_config.get("goloom_session_open_ms") - if limits["session_timeout_ms"]: - limits["session_timeout_min"] = limits["session_timeout_ms"] / 1000 / 60 - limits["reconnect_wait_ms"] = client_config.get("wait_time_to_reconnect_ms") - except Exception as e: - limits["error"] = str(e) - - return limits - -async def check_audio_limits(): - limits = { - "codecs": [], - "opus_config": {}, - "red_supported": False - } - - try: - conn_info = get_connection_info("AudioCheck") - ws_url = conn_info["client_configuration"]["media_server_url"] - - async with websockets.connect(ws_url) as ws: - hello = { - "uid": generate_uuid(), - "hello": { - "participantMeta": {"name": "AudioCheck", "role": "SPEAKER", "sendAudio": False, "sendVideo": False}, - "participantAttributes": {"name": "AudioCheck", "role": "SPEAKER"}, - "sendAudio": False, "sendVideo": False, "sendSharing": False, - "participantId": conn_info["peer_id"], - "roomId": conn_info["room_id"], - "serviceName": "telemost", - "credentials": conn_info["credentials"], - "capabilitiesOffer": {"offerAnswerMode": ["SEPARATE"], "initialSubscriberOffer": ["ON_HELLO"]}, - "sdkInfo": {"implementation": "python", "version": "1.0.0"}, - "sdkInitializationId": generate_uuid(), - "disablePublisher": False, "disableSubscriber": False - } - } - await ws.send(json.dumps(hello)) - - for _ in range(10): - data = json.loads(await ws.recv()) - if "serverHello" in data: - await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) - if "subscriberSdpOffer" in data: - sdp = data["subscriberSdpOffer"]["sdp"] - for line in sdp.split('\r\n'): - if line.startswith('a=rtpmap:') and 'opus' in line.lower(): - parts = line.split() - codec_info = parts[1].split('/') - limits["codecs"].append({ - "name": codec_info[0], - "rate": int(codec_info[1]) if len(codec_info) > 1 else None, - "channels": int(codec_info[2]) if len(codec_info) > 2 else None - }) - if line.startswith('a=fmtp:') and 'opus' in sdp[max(0, sdp.find(line)-200):sdp.find(line)].lower(): - params = line.split(':', 1)[1].strip().split(';') - for param in params: - if '=' in param: - key, val = param.strip().split('=') - limits["opus_config"][key] = val - if 'red' in line.lower() and 'rtpmap' in line: - limits["red_supported"] = True - break - except Exception as e: - limits["error"] = str(e) - - return limits - -async def check_video_limits(): - limits = { - "codecs": [], - "simulcast_supported": False, - "bandwidth_layers": {} - } - - try: - conn_info = get_connection_info("VideoCheck") - ws_url = conn_info["client_configuration"]["media_server_url"] - - async with websockets.connect(ws_url) as ws: - hello = { - "uid": generate_uuid(), - "hello": { - "participantMeta": {"name": "VideoCheck", "role": "SPEAKER", "sendAudio": False, "sendVideo": False}, - "participantAttributes": {"name": "VideoCheck", "role": "SPEAKER"}, - "sendAudio": False, "sendVideo": False, "sendSharing": False, - "participantId": conn_info["peer_id"], - "roomId": conn_info["room_id"], - "serviceName": "telemost", - "credentials": conn_info["credentials"], - "capabilitiesOffer": { - "offerAnswerMode": ["SEPARATE"], - "initialSubscriberOffer": ["ON_HELLO"], - "simulcastMode": ["STATIC"] - }, - "sdkInfo": {"implementation": "python", "version": "1.0.0"}, - "sdkInitializationId": generate_uuid(), - "disablePublisher": False, "disableSubscriber": False - } - } - await ws.send(json.dumps(hello)) - - for _ in range(10): - data = json.loads(await ws.recv()) - if "serverHello" in data: - caps = data["serverHello"].get("capabilitiesAnswer", {}) - limits["simulcast_supported"] = caps.get("simulcastMode") in ["STATIC", "DYNAMIC"] - await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) - if "subscriberSdpOffer" in data: - sdp = data["subscriberSdpOffer"]["sdp"] - codec_names = set() - for line in sdp.split('\r\n'): - if line.startswith('a=rtpmap:') and 'm=video' in sdp[:sdp.find(line)]: - parts = line.split() - if len(parts) >= 2: - codec_info = parts[1].split('/') - codec_name = codec_info[0].upper() - if codec_name not in codec_names and codec_name not in ['RTX', 'RED', 'ULPFEC']: - codec_names.add(codec_name) - limits["codecs"].append(codec_name) - break - - limits["bandwidth_layers"] = { - "L1": {"single": "1000 kbps"}, - "L2": {"low": "120 kbps", "med": "360 kbps"}, - "L3": {"low": "120 kbps", "med": "360 kbps", "hi": "800 kbps"}, - "L4": {"low": "120 kbps", "med": "360 kbps", "hi": "800 kbps", "ultra": "1000 kbps"} - } - except Exception as e: - limits["error"] = str(e) - - return limits - -async def check_ice_limits(): - limits = { - "stun_servers": [], - "turn_servers": [], - "ping_interval_ms": None, - "ack_timeout_ms": None - } - - try: - conn_info = get_connection_info("ICECheck") - ws_url = conn_info["client_configuration"]["media_server_url"] - - ice_servers = conn_info["client_configuration"].get("ice_servers", []) - for server in ice_servers: - urls = server.get("urls", []) - for url in urls: - if url.startswith("stun:"): - limits["stun_servers"].append(url) - elif url.startswith("turn:"): - limits["turn_servers"].append(url) - - async with websockets.connect(ws_url) as ws: - hello = { - "uid": generate_uuid(), - "hello": { - "participantMeta": {"name": "ICECheck", "role": "SPEAKER", "sendAudio": False, "sendVideo": False}, - "participantAttributes": {"name": "ICECheck", "role": "SPEAKER"}, - "sendAudio": False, "sendVideo": False, "sendSharing": False, - "participantId": conn_info["peer_id"], - "roomId": conn_info["room_id"], - "serviceName": "telemost", - "credentials": conn_info["credentials"], - "capabilitiesOffer": {"offerAnswerMode": ["SEPARATE"], "initialSubscriberOffer": ["ON_HELLO"]}, - "sdkInfo": {"implementation": "python", "version": "1.0.0"}, - "sdkInitializationId": generate_uuid(), - "disablePublisher": False, "disableSubscriber": False - } - } - await ws.send(json.dumps(hello)) - - for _ in range(10): - data = json.loads(await ws.recv()) - if "serverHello" in data: - ping_config = data["serverHello"].get("pingPongConfiguration", {}) - limits["ping_interval_ms"] = ping_config.get("pingInterval") - limits["ack_timeout_ms"] = ping_config.get("ackTimeout") - - rtc_config = data["serverHello"].get("rtcConfiguration", {}) - ice_servers_full = rtc_config.get("iceServers", []) - for server in ice_servers_full: - urls = server.get("urls", []) - for url in urls: - if url.startswith("stun:") and url not in limits["stun_servers"]: - limits["stun_servers"].append(url) - elif url.startswith("turn:") and url not in limits["turn_servers"]: - limits["turn_servers"].append(url) - break - except Exception as e: - limits["error"] = str(e) - - return limits - -async def create_test_peer(name, is_server=False, sctp_tweaks=None): - conn_info = get_connection_info(name) - room_id = conn_info["room_id"] - peer_id = conn_info["peer_id"] - credentials = conn_info["credentials"] - ws_url = conn_info["client_configuration"]["media_server_url"] - - pc_sub = RTCPeerConnection(RTCConfiguration( - iceServers=[RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"])] - )) - - pc_pub = RTCPeerConnection(RTCConfiguration( - iceServers=[RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"])] - )) - - dc_pub = pc_pub.createDataChannel("limits_test", ordered=True) - dc_pub_open = asyncio.Event() - - stats = { - "sent": 0, - "received": 0, - "bytes_sent": 0, - "bytes_received": 0, - "errors": [], - "last_received": None, - "events": [] - } - - @dc_pub.on("open") - def on_pub_open(): - stats["events"].append(f"[{name}] Publisher DC opened") - dc_pub_open.set() - - @dc_pub.on("message") - def on_pub_msg(msg): - stats["events"].append(f"[{name}] Publisher DC received {len(msg)}b") - stats["received"] += 1 - stats["bytes_received"] += len(msg) - stats["last_received"] = len(msg) - - @dc_pub.on("error") - def on_pub_error(error): - stats["events"].append(f"[{name}] Publisher DC error: {error}") - stats["errors"].append(f"Pub DC error: {error}") - - @pc_sub.on("datachannel") - def on_sub_dc(channel): - stats["events"].append(f"[{name}] Subscriber DC created: {channel.label}") - - @channel.on("open") - def on_sub_open(): - stats["events"].append(f"[{name}] Subscriber DC opened") - - @channel.on("message") - def on_message(message): - stats["events"].append(f"[{name}] Subscriber DC received {len(message)}b") - stats["received"] += 1 - stats["bytes_received"] += len(message) - stats["last_received"] = len(message) - - if is_server: - try: - stats["events"].append(f"[{name}] Echoing {len(message)}b back...") - dc_pub.send(message) - stats["sent"] += 1 - stats["bytes_sent"] += len(message) - stats["events"].append(f"[{name}] Echo sent successfully") - except Exception as e: - stats["events"].append(f"[{name}] Echo failed: {e}") - stats["errors"].append(f"Send error: {str(e)}") - - @channel.on("error") - def on_sub_error(error): - stats["events"].append(f"[{name}] Subscriber DC error: {error}") - stats["errors"].append(f"Sub DC error: {error}") - - ws = await websockets.connect(ws_url) - - hello_msg = { - "uid": generate_uuid(), - "hello": { - "participantMeta": {"name": name, "role": "SPEAKER", "sendAudio": False, "sendVideo": False}, - "participantAttributes": {"name": name, "role": "SPEAKER"}, - "sendAudio": False, - "sendVideo": False, - "sendSharing": False, - "participantId": peer_id, - "roomId": room_id, - "serviceName": "telemost", - "credentials": credentials, - "capabilitiesOffer": { - "offerAnswerMode": ["SEPARATE"], - "initialSubscriberOffer": ["ON_HELLO"], - "slotsMode": ["FROM_CONTROLLER"], - "simulcastMode": ["DISABLED"], - "selfVadStatus": ["FROM_SERVER"], - "dataChannelSharing": ["TO_RTP"] - }, - "sdkInfo": {"implementation": "python", "version": "1.0.0", "userAgent": f"LimitsTest-{name}"}, - "sdkInitializationId": generate_uuid(), - "disablePublisher": False, - "disableSubscriber": False - } - } - - await ws.send(json.dumps(hello_msg)) - - publisher_sdp_sent = False - - async def ws_handler(): - nonlocal publisher_sdp_sent - while True: - try: - data = json.loads(await ws.recv()) - - if "serverHello" in data: - await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) - - if "subscriberSdpOffer" in data and not publisher_sdp_sent: - await pc_sub.setRemoteDescription(RTCSessionDescription( - sdp=data["subscriberSdpOffer"]["sdp"], type="offer" - )) - - answer = await pc_sub.createAnswer() - await pc_sub.setLocalDescription(answer) - - await ws.send(json.dumps({ - "uid": generate_uuid(), - "subscriberSdpAnswer": { - "pcSeq": data["subscriberSdpOffer"]["pcSeq"], - "sdp": pc_sub.localDescription.sdp - } - })) - - await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) - await asyncio.sleep(0.3) - - pub_offer = await pc_pub.createOffer() - await pc_pub.setLocalDescription(pub_offer) - - sdp_modified = pc_pub.localDescription.sdp - if sctp_tweaks: - if 'max_message_size' in sctp_tweaks: - sdp_modified = sdp_modified.replace( - 'a=max-message-size:', - f'a=max-message-size:{sctp_tweaks["max_message_size"]}\r\na=old-max-message-size:' - ) - if 'sctp_buf' in sctp_tweaks: - if 'a=sctp-port:' in sdp_modified: - sdp_modified = sdp_modified.replace( - 'a=sctp-port:', - f'a=sctpmap:5000 webrtc-datachannel {sctp_tweaks["sctp_buf"]}\r\na=sctp-port:' - ) - - await ws.send(json.dumps({ - "uid": generate_uuid(), - "publisherSdpOffer": { - "pcSeq": 1, - "sdp": sdp_modified - } - })) - publisher_sdp_sent = True - - if "publisherSdpAnswer" in data: - await pc_pub.setRemoteDescription(RTCSessionDescription( - sdp=data["publisherSdpAnswer"]["sdp"], type="answer" - )) - await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) - - if "webrtcIceCandidate" in data: - cand = data["webrtcIceCandidate"] - try: - parts = cand["candidate"].split() - if len(parts) >= 8: - ice = RTCIceCandidate( - component=int(parts[1]), - foundation=parts[0].replace("candidate:", ""), - ip=parts[4], - port=int(parts[5]), - priority=int(parts[3]), - protocol=parts[2], - type=parts[7], - sdpMid=cand["sdpMid"], - sdpMLineIndex=cand["sdpMlineIndex"] - ) - - if cand.get("target") == "SUBSCRIBER": - await pc_sub.addIceCandidate(ice) - elif cand.get("target") == "PUBLISHER": - await pc_pub.addIceCandidate(ice) - except: - pass - - except: - break - - @pc_sub.on("icecandidate") - async def on_sub_ice(event): - if event.candidate: - await ws.send(json.dumps({ - "uid": generate_uuid(), - "webrtcIceCandidate": { - "candidate": event.candidate.candidate, - "sdpMid": event.candidate.sdpMid, - "sdpMlineIndex": event.candidate.sdpMLineIndex, - "target": "SUBSCRIBER", - "pcSeq": 1 - } - })) - - @pc_pub.on("icecandidate") - async def on_pub_ice(event): - if event.candidate: - await ws.send(json.dumps({ - "uid": generate_uuid(), - "webrtcIceCandidate": { - "candidate": event.candidate.candidate, - "sdpMid": event.candidate.sdpMid, - "sdpMlineIndex": event.candidate.sdpMLineIndex, - "target": "PUBLISHER", - "pcSeq": 1 - } - })) - - ws_task = asyncio.create_task(ws_handler()) - - return { - "name": name, - "dc_pub": dc_pub, - "dc_pub_open": dc_pub_open, - "stats": stats, - "ws": ws, - "ws_task": ws_task, - "pc_sub": pc_sub, - "pc_pub": pc_pub - } - -async def test_message_size_limits(max_size): - print("\n[REAL TEST] Testing message size limits...") - - try: - server = await create_test_peer("LimitServer", is_server=True) - await asyncio.wait_for(server["dc_pub_open"].wait(), timeout=10.0) - print(" :P Server ready") - - client = await create_test_peer("LimitClient", is_server=False) - await asyncio.wait_for(client["dc_pub_open"].wait(), timeout=10.0) - print(" :P Client ready") - - await asyncio.sleep(3) - - sizes_kb = [1, 6, 8, 10, 12, 14, 16, 18, 20, 24, 28, 32] - tests = [(f"{kb}KB", kb * 1024, 3, 2) for kb in sizes_kb] - - results = [] - last_success_size = 0 - - for test_name, size, send_wait, echo_wait in tests: - client["stats"]["last_received"] = None - server["stats"]["last_received"] = None - client["stats"]["events"].clear() - server["stats"]["events"].clear() - - try: - print(f"\n -> Sending {test_name}...") - - dc = client["dc_pub"] - print(f" -> DC state: {dc.readyState}") - print(f" -> DC bufferedAmount before: {dc.bufferedAmount}") - - data = "X" * size - - print(f" -> Calling send() for {size:,} bytes...") - try: - dc.send(data) - client["stats"]["sent"] += 1 - client["stats"]["bytes_sent"] += len(data) - print(f" -> send() returned successfully") - except Exception as send_err: - print(f" X send() raised exception: {send_err}") - results.append((test_name, False, f"Send exception: {send_err}")) - continue - - print(f" -> DC bufferedAmount after: {dc.bufferedAmount}") - print(f" -> Waiting {send_wait}s for delivery...") - - for i in range(send_wait): - await asyncio.sleep(1) - print(f" -> {i+1}s... buffered: {dc.bufferedAmount}, server events: {len(server['stats']['events'])}") - if server["stats"]["last_received"]: - print(f" -> Server received after {i+1}s!") - break - - server_received = server["stats"]["last_received"] - - if server_received: - print(f" -> Server got {server_received} bytes, waiting {echo_wait}s for echo...") - for i in range(echo_wait): - await asyncio.sleep(1) - if client["stats"]["last_received"]: - print(f" -> Client received echo after {i+1}s!") - break - print(f" -> {i+1}s... (client: {len(client['stats']['events'])} events)") - else: - print(f" -> Server: NO DATA") - print(f" -> Final bufferedAmount: {dc.bufferedAmount}") - - client_received = client["stats"]["last_received"] - - if client_received: - print(f" :P {test_name} ({size:,}b): SUCCESS") - results.append((test_name, True, None)) - last_success_size = size - elif server_received: - if server["stats"]["errors"]: - print(f" X {test_name}: Echo failed - {server['stats']['errors'][-1]}") - results.append((test_name, False, server['stats']['errors'][-1])) - else: - print(f" X {test_name}: Server got it, but no echo back") - results.append((test_name, False, "No echo received")) - else: - print(f" X {test_name}: Never reached server") - results.append((test_name, False, "Never reached server")) - break - - except Exception as e: - print(f" X {test_name} ({size:,}b): TEST FAILED - {str(e)}") - results.append((test_name, False, f"Test error: {str(e)[:30]}")) - break - - if last_success_size > 0: - print(f"\n :P Max working size: {last_success_size:,} bytes ({last_success_size/1024:.0f}KB)") - - server["ws_task"].cancel() - client["ws_task"].cancel() - await server["ws"].close() - await client["ws"].close() - await server["pc_sub"].close() - await server["pc_pub"].close() - await client["pc_sub"].close() - await client["pc_pub"].close() - - return results - - except Exception as e: - print(f" X Standard test failed: {e}") - return [] - -async def test_throughput_limits(): - print("\n[REAL TEST] Testing throughput limits...") - - try: - server = await create_test_peer("ThroughputServer", is_server=True) - await asyncio.wait_for(server["dc_pub_open"].wait(), timeout=10.0) - - client = await create_test_peer("ThroughputClient", is_server=False) - await asyncio.wait_for(client["dc_pub_open"].wait(), timeout=10.0) - print(" :P Peers ready") - - await asyncio.sleep(3) - - msg_size = 1024 - msg_count = 50 - data = "X" * msg_size - - start_time = asyncio.get_event_loop().time() - - for i in range(msg_count): - try: - client["dc_pub"].send(data) - client["stats"]["sent"] += 1 - client["stats"]["bytes_sent"] += len(data) - await asyncio.sleep(0.05) - except Exception as e: - print(f" X Send failed at message {i}: {e}") - break - - await asyncio.sleep(3) - - end_time = asyncio.get_event_loop().time() - duration = end_time - start_time - - total_bytes = client["stats"]["bytes_sent"] - throughput_kbps = (total_bytes * 8) / (duration * 1_000) - - print(f" :P Sent: {client['stats']['sent']} messages") - print(f" :P Received: {server['stats']['received']} messages") - print(f" :P Throughput: {throughput_kbps:.2f} Kbps") - print(f" :P Duration: {duration:.2f}s") - - server["ws_task"].cancel() - client["ws_task"].cancel() - await server["ws"].close() - await client["ws"].close() - await server["pc_sub"].close() - await server["pc_pub"].close() - await client["pc_sub"].close() - await client["pc_pub"].close() - - success = server['stats']['received'] > 0 - return [("Throughput test", success, f"{throughput_kbps:.2f} Kbps" if success else "No response")] - - except Exception as e: - print(f" X Test failed: {e}") - return [("Throughput test", False, str(e))] - -async def test_chunked_transfer(): - print("\n[REAL TEST] Testing large data transfer (app-level chunking)...") - - try: - server = await create_test_peer("BigServer", is_server=True) - await asyncio.wait_for(server["dc_pub_open"].wait(), timeout=10.0) - - client = await create_test_peer("BigClient", is_server=False) - await asyncio.wait_for(client["dc_pub_open"].wait(), timeout=10.0) - print(" :P Peers ready") - - await asyncio.sleep(3) - - test_sizes = [ - ("64KB", 64 * 1024), - ("128KB", 128 * 1024), - ("256KB", 256 * 1024), - ] - - results = [] - chunk_size = 8192 - - for name, total_size in test_sizes: - server["stats"]["bytes_received"] = 0 - server["stats"]["received"] = 0 - - chunks_needed = (total_size + chunk_size - 1) // chunk_size - - print(f"\n -> Sending {name} ({total_size:,}b) as {chunks_needed} x 8KB chunks...") - - start_time = asyncio.get_event_loop().time() - - try: - for i in range(chunks_needed): - chunk = ("X" * chunk_size) - - while client["dc_pub"].bufferedAmount > chunk_size * 2: - await asyncio.sleep(0.001) - - client["dc_pub"].send(chunk) - - await asyncio.sleep(2) - - end_time = asyncio.get_event_loop().time() - duration_ms = (end_time - start_time) * 1000 - - received = server["stats"]["bytes_received"] - - if received >= total_size: - throughput_kbps = (total_size * 8) / duration_ms - print(f" :P {name}: SUCCESS in {duration_ms:.0f}ms ({throughput_kbps:.1f} Kbps)") - results.append((name, True, f"{duration_ms:.0f}ms")) - else: - print(f" X {name}: PARTIAL ({received:,}b / {total_size:,}b)") - results.append((name, False, f"{received}b")) - break - - except Exception as e: - print(f" X {name}: ERROR - {e}") - results.append((name, False, str(e)[:30])) - break - - server["ws_task"].cancel() - client["ws_task"].cancel() - await server["ws"].close() - await client["ws"].close() - await server["pc_sub"].close() - await server["pc_pub"].close() - await client["pc_sub"].close() - await client["pc_pub"].close() - - return results - - except Exception as e: - print(f" X Test failed: {e}") - return [("Large transfer", False, str(e))] - -async def test_latency_microbench(): - print("\n[REAL TEST] Testing latency microbenchmarks...") - - try: - server = await create_test_peer("LatencyServer", is_server=True) - await asyncio.wait_for(server["dc_pub_open"].wait(), timeout=10.0) - - client = await create_test_peer("LatencyClient", is_server=False) - await asyncio.wait_for(client["dc_pub_open"].wait(), timeout=10.0) - print(" :P Latency test peers ready") - - await asyncio.sleep(3) - - test_cases = [ - ("Tiny (100b)", 100), - ("Small (1KB)", 1024), - ("Medium (4KB)", 4096), - ("Large (8KB)", 8192), - ] - - results = [] - - for name, size in test_cases: - latencies = [] - data = "L" * size - - print(f"\n -> Testing {name}...") - - for i in range(10): - client["stats"]["last_received"] = None - server["stats"]["last_received"] = None - - send_time = asyncio.get_event_loop().time() - client["dc_pub"].send(data) - - for _ in range(50): - await asyncio.sleep(0.01) - if client["stats"]["last_received"]: - recv_time = asyncio.get_event_loop().time() - rtt_us = (recv_time - send_time) * 1_000_000 - latencies.append(rtt_us) - break - - if latencies: - avg_lat = sum(latencies) / len(latencies) - min_lat = min(latencies) - max_lat = max(latencies) - - print(f" :P {name}: avg={avg_lat:.0f}µs, min={min_lat:.0f}µs, max={max_lat:.0f}µs") - results.append((name, True, f"{avg_lat:.0f}µs")) - else: - print(f" X {name}: No responses") - results.append((name, False, "No response")) - - server["ws_task"].cancel() - client["ws_task"].cancel() - await server["ws"].close() - await client["ws"].close() - await server["pc_sub"].close() - await server["pc_pub"].close() - await client["pc_sub"].close() - await client["pc_pub"].close() - - return results - - except Exception as e: - print(f" X Latency test failed: {e}") - return [("Latency test", False, str(e))] - -async def test_participant_limit(max_participants): - print("\n[REAL TEST] Testing participant limit...") - - peers = [] - try: - for i in range(min(3, max_participants)): - try: - peer = await create_test_peer(f"Peer{i+1}", is_server=False) - await asyncio.wait_for(peer["dc_pub_open"].wait(), timeout=10.0) - peers.append(peer) - print(f" :P Peer {i+1}/3 connected") - await asyncio.sleep(1) - except Exception as e: - print(f" X Peer {i+1} failed: {str(e)[:30]}") - break - - print(f" :P Successfully connected {len(peers)}/{min(3, max_participants)} peers") - - for peer in peers: - peer["ws_task"].cancel() - await peer["ws"].close() - await peer["pc_sub"].close() - await peer["pc_pub"].close() - - return [("Multi-peer test", True, f"{len(peers)} peers")] - - except Exception as e: - print(f" X Test failed: {e}") - return [("Multi-peer test", False, str(e))] - -async def check_all_limits(): - print(r""" - Yandex Telemost - Limits Check - Full verification of documented limits - by zowue for olc -""") - - print("[1/5] Checking DataChannel limits...") - dc_limits = await check_datachannel_limits() - if dc_limits['supported']: - print(f" :P DataChannel: SUPPORTED") - print(f" :P Protocol: {dc_limits['protocol']}") - print(f" :P SCTP Port: {dc_limits['sctp_port']}") - print(f" :P Max Message: {dc_limits['max_message_size_mb']:.0f}MB") - print(f" :P Ordered: YES") - else: - print(f" X DataChannel: NOT SUPPORTED") - - print("\n[2/5] Checking conference limits...") - conf_limits = await check_conference_limits() - print(f" :P Max Participants: {conf_limits['max_participants']}") - print(f" :P Media Platform: {conf_limits['media_platform']}") - print(f" :P Session Timeout: {conf_limits['session_timeout_min']:.1f}min") - print(f" :P Reconnect Wait: {conf_limits['reconnect_wait_ms']}ms") - - print("\n[3/5] Checking audio limits...") - audio_limits = await check_audio_limits() - if audio_limits['codecs']: - codec = audio_limits['codecs'][0] - print(f" :P Codec: {codec['name']} @ {codec['rate']//1000}kHz") - print(f" :P Channels: {codec['channels']} (stereo)") - if audio_limits['opus_config']: - print(f" :P FEC: {'YES' if audio_limits['opus_config'].get('useinbandfec') == '1' else 'NO'}") - print(f" :P DTX: {'YES' if audio_limits['opus_config'].get('usedtx') == '1' else 'NO'}") - print(f" :P RED: {'YES' if audio_limits['red_supported'] else 'NO'}") - - print("\n[4/5] Checking video limits...") - video_limits = await check_video_limits() - codecs_clean = [c for c in video_limits['codecs'] if c in ['VP8', 'VP9', 'H264', 'AV1']] - print(f" :P Codecs: {', '.join(codecs_clean)}") - print(f" :P Simulcast: {'YES' if video_limits['simulcast_supported'] else 'NO'}") - print(f" :P Layers: L1-L4 (120kbps - 1000kbps)") - - print("\n[5/5] Checking ICE/network limits...") - ice_limits = await check_ice_limits() - print(f" :P STUN Servers: {len(ice_limits['stun_servers'])}") - print(f" :P TURN Servers: {len(ice_limits['turn_servers'])}") - print(f" :P Ping Interval: {ice_limits['ping_interval_ms']}ms") - print(f" :P ACK Timeout: {ice_limits['ack_timeout_ms']}ms") - - test_results = [] - if dc_limits['supported'] and dc_limits['max_message_size']: - test_results = await test_message_size_limits(dc_limits['max_message_size']) - test_results.extend(await test_latency_microbench()) - test_results.extend(await test_throughput_limits()) - test_results.extend(await test_chunked_transfer()) - - if conf_limits['max_participants']: - test_results.extend(await test_participant_limit(conf_limits['max_participants'])) - - print(r""" - VERIFICATION RESULTS -""") - - doc_claims = { - "DataChannel max size": (1073741823, dc_limits.get('max_message_size')), - "SCTP port": (5000, dc_limits.get('sctp_port')), - "Max participants": (40, conf_limits.get('max_participants')), - "Session timeout": (120000, conf_limits.get('session_timeout_ms')), - "Ping interval": (5000, ice_limits.get('ping_interval_ms')), - "ACK timeout": (9000, ice_limits.get('ack_timeout_ms')) - } - - all_match = True - for claim, (expected, actual) in doc_claims.items(): - match = expected == actual - status = ":P" if match else "X" - print(f" - {claim}: {status} {actual} {'(OK)' if match else f'(expected {expected})'}") - if not match: - all_match = False - - if test_results: - print("\nReal Transfer Tests:") - for test_name, success, error in test_results: - status = ":P" if success else "X" - print(f" - {test_name}: {status} {'OK' if success else error[:30]}") - - if all_match: - print("\n :P ALL LIMITS VERIFIED - Documentation is accurate!") - else: - print("\n X MISMATCH DETECTED - Some limits differ from docs") - - return { - "datachannel": dc_limits, - "conference": conf_limits, - "audio": audio_limits, - "video": video_limits, - "ice": ice_limits - } - -if __name__ == "__main__": - try: - asyncio.run(check_all_limits()) - except KeyboardInterrupt: - print("\n\nCheck interrupted.") diff --git a/code/olcrtc.py b/code/olcrtc.py deleted file mode 100644 index 5e06534..0000000 --- a/code/olcrtc.py +++ /dev/null @@ -1,573 +0,0 @@ -#!/usr/bin/env python3 - -# =========================================== -# AI GENERATED / AI GENERATED / AI GENERATED -# =========================================== - - -import asyncio -import json -import uuid -import struct -import socket -import logging -from urllib.parse import quote -import websockets -import requests -from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceCandidate, RTCConfiguration, RTCIceServer -from cryptography.hazmat.primitives.ciphers.aead import ChaCha20Poly1305 -import os - -logging.basicConfig(level=logging.INFO, format='[%(levelname)s] %(message)s') -log = logging.getLogger(__name__) - -logging.getLogger('aiortc').setLevel(logging.ERROR) -logging.getLogger('aioice').setLevel(logging.ERROR) -logging.getLogger('av').setLevel(logging.ERROR) - -API_BASE = "https://cloud-api.yandex.ru/telemost_front/v2/telemost" -CHUNK_SIZE = 7168 -BUFFER_THRESHOLD = 16384 - -def gen_uuid(): - return str(uuid.uuid4()) - -def get_connection_info(room_url, display_name): - url = f"{API_BASE}/conferences/{quote(room_url, safe='')}/connection" - params = { - "next_gen_media_platform_allowed": "true", - "display_name": display_name, - "waiting_room_supported": "true" - } - headers = { - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0", - "Accept": "*/*", - "content-type": "application/json", - "Client-Instance-Id": gen_uuid(), - "X-Telemost-Client-Version": "187.1.0", - "idempotency-key": gen_uuid(), - "Origin": "https://telemost.yandex.ru", - "Referer": "https://telemost.yandex.ru/" - } - r = requests.get(url, params=params, headers=headers) - r.raise_for_status() - return r.json() - -class Crypto: - def __init__(self, key): - self.cipher = ChaCha20Poly1305(key) - - def encrypt(self, data): - nonce = os.urandom(12) - ct = self.cipher.encrypt(nonce, data, None) - return nonce + ct - - def decrypt(self, blob): - nonce = blob[:12] - ct = blob[12:] - return self.cipher.decrypt(nonce, ct, None) - -class Multiplexer: - def __init__(self, on_send): - self.streams = {} - self.next_id = 1 - self.on_send = on_send - - def open_stream(self): - sid = self.next_id - self.next_id += 1 - self.streams[sid] = { - "recv_buf": b"", - "send_queue": asyncio.Queue(), - "closed": False - } - return sid - - def close_stream(self, sid): - if sid in self.streams: - self.streams[sid]["closed"] = True - - async def send_data(self, sid, data): - if sid not in self.streams or self.streams[sid]["closed"]: - return - - log.debug(f"MUX send sid={sid} len={len(data)}") - for i in range(0, len(data), CHUNK_SIZE): - chunk = data[i:i+CHUNK_SIZE] - frame = struct.pack("!HH", sid, len(chunk)) + chunk - await self.on_send(frame) - - async def send_close(self, sid): - frame = struct.pack("!HH", sid, 0) - await self.on_send(frame) - self.close_stream(sid) - - def handle_frame(self, frame): - if len(frame) < 4: - log.warning(f"MUX frame too short: {len(frame)}b") - return - - sid, length = struct.unpack("!HH", frame[:4]) - - if length == 0: - log.debug(f"MUX close sid={sid}") - self.close_stream(sid) - return - - data = frame[4:4+length] - - if sid not in self.streams: - log.warning(f"MUX recv sid={sid} not found, opening it") - self.streams[sid] = { - "recv_buf": b"", - "send_queue": asyncio.Queue(), - "closed": False - } - - self.streams[sid]["recv_buf"] += data - log.debug(f"MUX recv sid={sid} len={len(data)} total_buf={len(self.streams[sid]['recv_buf'])}") - - def read_stream(self, sid, max_n=None): - if sid not in self.streams: - return b"" - - buf = self.streams[sid]["recv_buf"] - if not buf: - return b"" - - if max_n is None: - result = buf - self.streams[sid]["recv_buf"] = b"" - else: - result = buf[:max_n] - self.streams[sid]["recv_buf"] = buf[max_n:] - - return result - - def stream_closed(self, sid): - return sid not in self.streams or self.streams[sid]["closed"] - -class RTCPeer: - def __init__(self, room_url, name, crypto): - self.room_url = room_url - self.name = name - self.crypto = crypto - self.dc = None - self.dc_ready = asyncio.Event() - self.mux = None - - async def connect(self): - conn = get_connection_info(self.room_url, self.name) - room_id = conn["room_id"] - peer_id = conn["peer_id"] - credentials = conn["credentials"] - ws_url = conn["client_configuration"]["media_server_url"] - - pc_sub = RTCPeerConnection(RTCConfiguration( - iceServers=[RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"])] - )) - pc_pub = RTCPeerConnection(RTCConfiguration( - iceServers=[RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"])] - )) - - self.dc = pc_pub.createDataChannel("olcrtc", ordered=True) - - @self.dc.on("open") - def on_open(): - self.dc_ready.set() - - @self.dc.on("message") - def on_msg(msg): - if isinstance(msg, bytes): - try: - plain = self.crypto.decrypt(msg) - self.mux.handle_frame(plain) - log.debug(f"DC received {len(msg)}b encrypted, {len(plain)}b plain") - except Exception as e: - log.error(f"DC decrypt error: {e}") - - @pc_sub.on("datachannel") - def on_dc(ch): - log.info(f"Received datachannel: {ch.label}") - @ch.on("message") - def on_message(msg): - if isinstance(msg, bytes): - try: - plain = self.crypto.decrypt(msg) - self.mux.handle_frame(plain) - log.debug(f"SUB DC received {len(msg)}b encrypted, {len(plain)}b plain") - except Exception as e: - log.error(f"SUB DC decrypt error: {e}") - - ws = await websockets.connect(ws_url) - - hello = { - "uid": gen_uuid(), - "hello": { - "participantMeta": {"name": self.name, "role": "SPEAKER", "sendAudio": False, "sendVideo": False}, - "participantAttributes": {"name": self.name, "role": "SPEAKER"}, - "sendAudio": False, - "sendVideo": False, - "sendSharing": False, - "participantId": peer_id, - "roomId": room_id, - "serviceName": "telemost", - "credentials": credentials, - "capabilitiesOffer": { - "offerAnswerMode": ["SEPARATE"], - "initialSubscriberOffer": ["ON_HELLO"], - "slotsMode": ["FROM_CONTROLLER"], - "simulcastMode": ["DISABLED"], - "selfVadStatus": ["FROM_SERVER"], - "dataChannelSharing": ["TO_RTP"] - }, - "sdkInfo": {"implementation": "python", "version": "1.0.0", "userAgent": f"OlcRTC-{self.name}"}, - "sdkInitializationId": gen_uuid(), - "disablePublisher": False, - "disableSubscriber": False - } - } - - await ws.send(json.dumps(hello)) - - pub_sent = False - - async def ws_loop(): - nonlocal pub_sent - while True: - try: - msg = json.loads(await ws.recv()) - - if "serverHello" in msg: - await ws.send(json.dumps({"uid": msg["uid"], "ack": {"status": {"code": "OK"}}})) - - if "subscriberSdpOffer" in msg and not pub_sent: - await pc_sub.setRemoteDescription(RTCSessionDescription( - sdp=msg["subscriberSdpOffer"]["sdp"], type="offer" - )) - ans = await pc_sub.createAnswer() - await pc_sub.setLocalDescription(ans) - await ws.send(json.dumps({ - "uid": gen_uuid(), - "subscriberSdpAnswer": { - "pcSeq": msg["subscriberSdpOffer"]["pcSeq"], - "sdp": pc_sub.localDescription.sdp - } - })) - await ws.send(json.dumps({"uid": msg["uid"], "ack": {"status": {"code": "OK"}}})) - await asyncio.sleep(0.3) - - offer = await pc_pub.createOffer() - await pc_pub.setLocalDescription(offer) - await ws.send(json.dumps({ - "uid": gen_uuid(), - "publisherSdpOffer": { - "pcSeq": 1, - "sdp": pc_pub.localDescription.sdp - } - })) - pub_sent = True - - if "publisherSdpAnswer" in msg: - await pc_pub.setRemoteDescription(RTCSessionDescription( - sdp=msg["publisherSdpAnswer"]["sdp"], type="answer" - )) - await ws.send(json.dumps({"uid": msg["uid"], "ack": {"status": {"code": "OK"}}})) - - if "webrtcIceCandidate" in msg: - cand = msg["webrtcIceCandidate"] - try: - parts = cand["candidate"].split() - if len(parts) >= 8: - ice = RTCIceCandidate( - component=int(parts[1]), - foundation=parts[0].replace("candidate:", ""), - ip=parts[4], - port=int(parts[5]), - priority=int(parts[3]), - protocol=parts[2], - type=parts[7], - sdpMid=cand["sdpMid"], - sdpMLineIndex=cand["sdpMlineIndex"] - ) - if cand.get("target") == "SUBSCRIBER": - await pc_sub.addIceCandidate(ice) - elif cand.get("target") == "PUBLISHER": - await pc_pub.addIceCandidate(ice) - except: - pass - except: - break - - @pc_sub.on("icecandidate") - async def on_sub_ice(e): - if e.candidate: - await ws.send(json.dumps({ - "uid": gen_uuid(), - "webrtcIceCandidate": { - "candidate": e.candidate.candidate, - "sdpMid": e.candidate.sdpMid, - "sdpMlineIndex": e.candidate.sdpMLineIndex, - "target": "SUBSCRIBER", - "pcSeq": 1 - } - })) - - @pc_pub.on("icecandidate") - async def on_pub_ice(e): - if e.candidate: - await ws.send(json.dumps({ - "uid": gen_uuid(), - "webrtcIceCandidate": { - "candidate": e.candidate.candidate, - "sdpMid": e.candidate.sdpMid, - "sdpMlineIndex": e.candidate.sdpMLineIndex, - "target": "PUBLISHER", - "pcSeq": 1 - } - })) - - asyncio.create_task(ws_loop()) - - async def send_encrypted(data): - enc = self.crypto.encrypt(data) - while self.dc.bufferedAmount > BUFFER_THRESHOLD: - await asyncio.sleep(0.001) - self.dc.send(enc) - log.debug(f"DC sent {len(data)}b plain, {len(enc)}b encrypted") - - self.mux = Multiplexer(send_encrypted) - - await asyncio.wait_for(self.dc_ready.wait(), timeout=15.0) - -class SOCKS5Server: - def __init__(self, peer, host="127.0.0.1", port=1080): - self.peer = peer - self.host = host - self.port = port - - async def handle_client(self, reader, writer): - sid = None - try: - ver = await reader.readexactly(1) - if ver[0] != 5: - writer.close() - return - - nmethods = await reader.readexactly(1) - await reader.readexactly(nmethods[0]) - - writer.write(b"\x05\x00") - await writer.drain() - - req = await reader.readexactly(4) - if req[1] != 1: - writer.write(b"\x05\x07\x00\x01\x00\x00\x00\x00\x00\x00") - await writer.drain() - writer.close() - return - - atyp = req[3] - if atyp == 1: - addr = socket.inet_ntoa(await reader.readexactly(4)) - elif atyp == 3: - length = (await reader.readexactly(1))[0] - addr = (await reader.readexactly(length)).decode() - else: - writer.write(b"\x05\x08\x00\x01\x00\x00\x00\x00\x00\x00") - await writer.drain() - writer.close() - return - - port_bytes = await reader.readexactly(2) - port = struct.unpack("!H", port_bytes)[0] - - sid = self.peer.mux.open_stream() - log.info(f"SOCKS5 connect sid={sid} {addr}:{port}") - - connect_req = json.dumps({"cmd": "connect", "addr": addr, "port": port}).encode() - await self.peer.mux.send_data(sid, connect_req) - - await asyncio.sleep(0.5) - - writer.write(b"\x05\x00\x00\x01\x00\x00\x00\x00\x00\x00") - await writer.drain() - - async def client_to_stream(): - try: - while True: - data = await reader.read(4096) - if not data: - break - log.debug(f"SOCKS5 sid={sid} client->stream {len(data)}b") - await self.peer.mux.send_data(sid, data) - await self.peer.mux.send_close(sid) - log.debug(f"SOCKS5 sid={sid} client closed") - except Exception as e: - log.error(f"SOCKS5 sid={sid} client_to_stream error: {e}") - - async def stream_to_client(): - try: - while not self.peer.mux.stream_closed(sid): - await asyncio.sleep(0.01) - data = self.peer.mux.read_stream(sid) - if data: - log.debug(f"SOCKS5 sid={sid} stream->client {len(data)}b") - writer.write(data) - await writer.drain() - log.debug(f"SOCKS5 sid={sid} stream closed") - except Exception as e: - log.error(f"SOCKS5 sid={sid} stream_to_client error: {e}") - - await asyncio.gather(client_to_stream(), stream_to_client()) - - except Exception as e: - log.error(f"SOCKS5 sid={sid} error: {e}") - finally: - try: - writer.close() - await writer.wait_closed() - except: - pass - - async def run(self): - server = await asyncio.start_server(self.handle_client, self.host, self.port) - print(f"SOCKS5 proxy listening on {self.host}:{self.port}") - async with server: - await server.serve_forever() - -class ProxyServer: - def __init__(self, peer): - self.peer = peer - self.connections = {} - - async def handle_stream(self, sid, req): - try: - cmd = req.get("cmd") - if cmd == "connect": - addr = req["addr"] - port = req["port"] - - log.info(f"SERVER connect sid={sid} {addr}:{port}") - - try: - r, w = await asyncio.open_connection(addr, port) - self.connections[sid] = (r, w) - log.info(f"SERVER sid={sid} connected") - - async def remote_to_stream(): - try: - while True: - data = await r.read(4096) - if not data: - break - log.debug(f"SERVER sid={sid} remote->stream {len(data)}b") - await self.peer.mux.send_data(sid, data) - await self.peer.mux.send_close(sid) - log.debug(f"SERVER sid={sid} remote closed") - except Exception as e: - log.error(f"SERVER sid={sid} remote_to_stream error: {e}") - - asyncio.create_task(remote_to_stream()) - - except Exception as e: - log.error(f"SERVER sid={sid} connect failed: {e}") - await self.peer.mux.send_close(sid) - except Exception as e: - log.error(f"SERVER sid={sid} handle_stream error: {e}") - - async def run(self): - log.info("SERVER proxy loop started") - while True: - await asyncio.sleep(0.01) - for sid in list(self.peer.mux.streams.keys()): - data = self.peer.mux.read_stream(sid) - if data: - if sid in self.connections: - r, w = self.connections[sid] - try: - log.debug(f"SERVER sid={sid} stream->remote {len(data)}b") - w.write(data) - await w.drain() - except Exception as e: - log.error(f"SERVER sid={sid} write error: {e}") - await self.peer.mux.send_close(sid) - else: - try: - req = json.loads(data.decode()) - await self.handle_stream(sid, req) - except Exception as e: - log.error(f"SERVER sid={sid} parse error: {e}") - - if self.peer.mux.stream_closed(sid) and sid in self.connections: - log.debug(f"SERVER sid={sid} cleanup") - r, w = self.connections[sid] - try: - w.close() - await w.wait_closed() - except: - pass - del self.connections[sid] - -async def run_server(room_url, key): - crypto = Crypto(key) - peer = RTCPeer(room_url, "OlcRTC-Server", crypto) - - log.info("Connecting to Telemost...") - await peer.connect() - log.info("Connected to Telemost") - - proxy = ProxyServer(peer) - await proxy.run() - -async def run_client(room_url, key, socks_port): - crypto = Crypto(key) - peer = RTCPeer(room_url, "OlcRTC-Client", crypto) - - log.info("Connecting to Telemost...") - await peer.connect() - log.info("Connected to Telemost") - - socks = SOCKS5Server(peer, port=socks_port) - await socks.run() - -def main(): - import argparse - - parser = argparse.ArgumentParser(description="OlcRTC - SOCKS5 over WebRTC DataChannel") - parser.add_argument("--srv", action="store_true", help="Run as server") - parser.add_argument("--cnc", action="store_true", help="Run as client") - parser.add_argument("--id", required=True, help="Telemost room ID") - parser.add_argument("--provider", default="telemost", help="Provider (telemost only)") - parser.add_argument("--socks-port", type=int, default=1080, help="SOCKS5 port (client only)") - parser.add_argument("--key", help="Shared encryption key (hex)") - parser.add_argument("--debug", action="store_true", help="Enable debug logging") - - args = parser.parse_args() - - if args.debug: - logging.getLogger().setLevel(logging.DEBUG) - - if args.provider != "telemost": - log.error("Only telemost provider supported in MVP") - return - - room_url = f"https://telemost.yandex.ru/j/{args.id}" - - if args.key: - key = bytes.fromhex(args.key) - else: - key = os.urandom(32) - log.info(f"Generated key: {key.hex()}") - - if args.srv: - log.info(f"Starting server mode, room: {args.id}") - asyncio.run(run_server(room_url, key)) - elif args.cnc: - log.info(f"Starting client mode, room: {args.id}, SOCKS5 port: {args.socks_port}") - asyncio.run(run_client(room_url, key, args.socks_port)) - else: - log.error("Specify --srv or --cnc") - -if __name__ == "__main__": - main() diff --git a/code/vcsend.py b/code/vcsend.py deleted file mode 100755 index 5cdfdbd..0000000 --- a/code/vcsend.py +++ /dev/null @@ -1,625 +0,0 @@ -#!/usr/bin/env python3 - -import asyncio -import json -import uuid -import websockets -import requests -import qrcode -import cv2 -import numpy as np -from urllib.parse import quote -from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceCandidate, RTCConfiguration, RTCIceServer -from aiortc.mediastreams import MediaStreamTrack -from av import VideoFrame -import base64 -from PIL import Image -from pyzbar import pyzbar -from fractions import Fraction - -CONFERENCE_ID = "75047680642749" -CONFERENCE_URL = f"https://telemost.yandex.ru/j/{CONFERENCE_ID}" -API_BASE = "https://cloud-api.yandex.ru/telemost_front/v2/telemost" - -QR_SIZE = 600 -CHUNK_SIZE = 400 -FRAME_RATE = 1 - - -def gen_uid(): - return str(uuid.uuid4()) - - -def get_connection_info(display_name): - url = f"{API_BASE}/conferences/{quote(CONFERENCE_URL, safe='')}/connection" - params = { - "next_gen_media_platform_allowed": "true", - "display_name": display_name, - "waiting_room_supported": "true" - } - headers = { - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0", - "Accept": "*/*", - "content-type": "application/json", - "Client-Instance-Id": gen_uid(), - "X-Telemost-Client-Version": "187.1.0", - "idempotency-key": gen_uid(), - "Origin": "https://telemost.yandex.ru", - "Referer": "https://telemost.yandex.ru/" - } - r = requests.get(url, params=params, headers=headers) - r.raise_for_status() - return r.json() - - -def make_qr_frame(data, pts): - qr = qrcode.QRCode( - version=None, - error_correction=qrcode.constants.ERROR_CORRECT_M, - box_size=12, - border=4 - ) - qr.add_data(data) - qr.make(fit=True) - img = qr.make_image(fill_color="black", back_color="white").resize( - (QR_SIZE, QR_SIZE), Image.NEAREST - ) - arr = np.array(img.convert('RGB')) - frame = VideoFrame.from_ndarray(arr, format="rgb24") - frame.pts = pts - frame.time_base = Fraction(1, FRAME_RATE) - return frame - - -def chunk_data(data, tid): - b64 = base64.b64encode(data).decode() - n = (len(b64) + CHUNK_SIZE - 1) // CHUNK_SIZE - return [json.dumps({"tid": tid, "idx": i, "total": n, - "data": b64[i * CHUNK_SIZE:(i + 1) * CHUNK_SIZE]}) - for i in range(n)] - - -class QRVideoTrack(MediaStreamTrack): - kind = "video" - - def __init__(self): - super().__init__() - self._frames = [] - self._idx = 0 - self._pts = 0 - - def set_data(self, chunks): - self._frames = [make_qr_frame(c, i) for i, c in enumerate(chunks)] - self._idx = 0 - self._pts = 0 - print(f" -> QRVideoTrack: {len(self._frames)} frames ready") - - async def recv(self): - await asyncio.sleep(1.0 / FRAME_RATE) - if not self._frames: - f = make_qr_frame("WAIT", self._pts) - self._pts += 1 - return f - f = self._frames[self._idx] - f.pts = self._pts - f.time_base = Fraction(1, FRAME_RATE) - self._pts += 1 - self._idx = (self._idx + 1) % len(self._frames) - if self._pts % 10 == 0: - print(f" -> QR sending frame {self._idx}/{len(self._frames)} pts={self._pts}") - return f - - -class QRReceiver: - def __init__(self): - self._bufs = {} - self.result = None - self._frame_count = 0 - self._cv2_detector = cv2.QRCodeDetector() - - def feed_frame(self, frame): - try: - self._frame_count += 1 - arr = frame.to_ndarray(format="rgb24") - h, w = arr.shape[:2] - - if self._frame_count <= 3: - cv2.imwrite(f"/tmp/qr_recv_{self._frame_count}.png", - cv2.cvtColor(arr, cv2.COLOR_RGB2BGR)) - print(f" -> [recv] saved /tmp/qr_recv_{self._frame_count}.png {w}x{h}") - - gray = cv2.cvtColor(arr, cv2.COLOR_RGB2GRAY) - - variants = [gray] - up2 = cv2.resize(gray, (w * 2, h * 2), interpolation=cv2.INTER_CUBIC) - variants.append(up2) - _, thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) - variants.append(thresh) - variants.append(cv2.resize(thresh, (w * 2, h * 2), interpolation=cv2.INTER_NEAREST)) - - decoded = set() - for v in variants: - try: - for code in pyzbar.decode(v): - decoded.add(code.data.decode('utf-8')) - except Exception: - pass - try: - val, _, _ = self._cv2_detector.detectAndDecode(v) - if val: - decoded.add(val) - except Exception: - pass - - for raw in decoded: - try: - pkt = json.loads(raw) - tid = pkt["tid"] - idx = pkt["idx"] - total = pkt["total"] - data = pkt["data"] - if tid not in self._bufs: - self._bufs[tid] = {} - if idx not in self._bufs[tid]: - self._bufs[tid][idx] = data - print(f" -> QR chunk {idx+1}/{total} tid={tid[:8]}") - if len(self._bufs[tid]) == total: - b64 = "".join(self._bufs[tid][i] for i in range(total)) - self.result = base64.b64decode(b64) - print(f" -> QR COMPLETE: {len(self.result)} bytes") - return True - except Exception: - pass - - if self._frame_count % 30 == 0: - print(f" -> [recv] {self._frame_count} frames, no QR, size={w}x{h}") - - except Exception as e: - print(f" -> [recv] feed_frame err: {e}") - return False - - -async def process_track(track, receiver, name): - print(f" -> [{name}] video processor started") - count = 0 - while True: - try: - frame = await asyncio.wait_for(track.recv(), timeout=30.0) - count += 1 - if count <= 5 or count % 50 == 0: - print(f" -> [{name}] frame #{count} {frame.width}x{frame.height}") - if receiver.feed_frame(frame): - return - except asyncio.TimeoutError: - print(f" -> [{name}] track frozen after {count} frames") - return - except Exception as e: - print(f" -> [{name}] track err: {e}") - return - - -def make_ice_servers(raw_list): - result = [] - for s in raw_list: - urls = s.get("urls", []) - cred = s.get("credential", "") - user = s.get("username", "") - if cred: - result.append(RTCIceServer(urls=urls, credential=cred, username=user)) - else: - result.append(RTCIceServer(urls=urls)) - return result or [RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"])] - - -async def connect_peer(name, conn): - room_id = conn["room_id"] - peer_id = conn["peer_id"] - credentials = conn["credentials"] - ws_url = conn["client_configuration"]["media_server_url"] - is_sender = "Sender" in name - - print(f"\n -> [{name}] room={room_id} peer={peer_id[:8]} sender={is_sender}") - - default_ice = [RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"])] - - video_track = QRVideoTrack() if is_sender else None - receiver_obj = QRReceiver() if not is_sender else None - track_tasks = [] - - # используем списки чтобы можно было переприсваивать в замыканиях - pc_sub_ref = [RTCPeerConnection(RTCConfiguration(iceServers=default_ice))] - pc_pub_ref = [RTCPeerConnection(RTCConfiguration(iceServers=default_ice))] - - if is_sender: - pc_pub_ref[0].addTrack(video_track) - - ws = await websockets.connect( - ws_url, - additional_headers={ - "Origin": "https://telemost.yandex.ru", - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0" - } - ) - print(f" -> [{name}] WS connected") - - async def send(obj): - await ws.send(json.dumps(obj)) - - async def ack(uid): - await send({"uid": uid, "ack": {"status": {"code": "OK", "description": ""}}}) - - def setup_pc(pc_sub, pc_pub): - @pc_sub.on("track") - def on_track(track): - print(f" -> [{name}] GOT TRACK kind={track.kind}") - if track.kind == "video" and receiver_obj is not None: - t = asyncio.ensure_future(process_track(track, receiver_obj, name)) - track_tasks.append(t) - - @pc_sub.on("connectionstatechange") - async def _s(): - print(f" -> [{name}] sub={pc_sub.connectionState}") - - @pc_pub.on("connectionstatechange") - async def _p(): - print(f" -> [{name}] pub={pc_pub.connectionState}") - - @pc_sub.on("iceconnectionstatechange") - async def _si(): - print(f" -> [{name}] sub ICE={pc_sub.iceConnectionState}") - - @pc_pub.on("iceconnectionstatechange") - async def _pi(): - print(f" -> [{name}] pub ICE={pc_pub.iceConnectionState}") - - @pc_sub.on("icecandidate") - async def on_sub_ice(e): - if e.candidate: - await send({ - "uid": gen_uid(), - "webrtcIceCandidate": { - "candidate": e.candidate.candidate, - "sdpMid": e.candidate.sdpMid, - "sdpMlineIndex": e.candidate.sdpMLineIndex, - "usernameFragment": "", - "target": "SUBSCRIBER", - "pcSeq": 1 - } - }) - print(f" -> [{name}] >> sub ICE sent") - - @pc_pub.on("icecandidate") - async def on_pub_ice(e): - if e.candidate: - await send({ - "uid": gen_uid(), - "webrtcIceCandidate": { - "candidate": e.candidate.candidate, - "sdpMid": e.candidate.sdpMid, - "sdpMlineIndex": e.candidate.sdpMLineIndex, - "usernameFragment": "", - "target": "PUBLISHER", - "pcSeq": 1 - } - }) - print(f" -> [{name}] >> pub ICE sent") - - setup_pc(pc_sub_ref[0], pc_pub_ref[0]) - - hello = { - "uid": gen_uid(), - "hello": { - "participantMeta": { - "name": name, "role": "SPEAKER", "description": "", - "sendAudio": False, "sendVideo": is_sender - }, - "participantAttributes": {"name": name, "role": "SPEAKER", "description": ""}, - "sendAudio": False, - "sendVideo": is_sender, - "sendSharing": False, - "participantId": peer_id, - "roomId": room_id, - "serviceName": "telemost", - "credentials": credentials, - "capabilitiesOffer": { - "offerAnswerMode": ["SEPARATE"], - "initialSubscriberOffer": ["ON_HELLO"], - "slotsMode": ["FROM_CONTROLLER"], - "simulcastMode": ["DISABLED", "STATIC"], - "selfVadStatus": ["FROM_SERVER", "FROM_CLIENT"], - "dataChannelSharing": ["TO_RTP"], - "videoEncoderConfig": ["NO_CONFIG", "ONLY_INIT_CONFIG", "RUNTIME_CONFIG"], - "dataChannelVideoCodec": ["VP8", "UNIQUE_CODEC_FROM_TRACK_DESCRIPTION"], - "bandwidthLimitationReason": ["BANDWIDTH_REASON_DISABLED", "BANDWIDTH_REASON_ENABLED"], - "sdkDefaultDeviceManagement": ["SDK_DEFAULT_DEVICE_MANAGEMENT_DISABLED", "SDK_DEFAULT_DEVICE_MANAGEMENT_ENABLED"], - "joinOrderLayout": ["JOIN_ORDER_LAYOUT_DISABLED", "JOIN_ORDER_LAYOUT_ENABLED"], - "pinLayout": ["PIN_LAYOUT_DISABLED"], - "sendSelfViewVideoSlot": ["SEND_SELF_VIEW_VIDEO_SLOT_DISABLED", "SEND_SELF_VIEW_VIDEO_SLOT_ENABLED"], - "serverLayoutTransition": ["SERVER_LAYOUT_TRANSITION_DISABLED"], - "sdkPublisherOptimizeBitrate": ["SDK_PUBLISHER_OPTIMIZE_BITRATE_DISABLED", "SDK_PUBLISHER_OPTIMIZE_BITRATE_FULL", "SDK_PUBLISHER_OPTIMIZE_BITRATE_ONLY_SELF"], - "sdkNetworkLostDetection": ["SDK_NETWORK_LOST_DETECTION_DISABLED"], - "sdkNetworkPathMonitor": ["SDK_NETWORK_PATH_MONITOR_DISABLED"], - "publisherVp9": ["PUBLISH_VP9_DISABLED", "PUBLISH_VP9_ENABLED"], - "svcMode": ["SVC_MODE_DISABLED", "SVC_MODE_L3T3", "SVC_MODE_L3T3_KEY"], - "subscriberOfferAsyncAck": ["SUBSCRIBER_OFFER_ASYNC_ACK_DISABLED", "SUBSCRIBER_OFFER_ASYNC_ACK_ENABLED"], - "androidBluetoothRoutingFix": ["ANDROID_BLUETOOTH_ROUTING_FIX_DISABLED"], - "fixedIceCandidatesPoolSize": ["FIXED_ICE_CANDIDATES_POOL_SIZE_DISABLED"], - "sdkAndroidTelecomIntegration": ["SDK_ANDROID_TELECOM_INTEGRATION_DISABLED"], - "setActiveCodecsMode": ["SET_ACTIVE_CODECS_MODE_DISABLED", "SET_ACTIVE_CODECS_MODE_VIDEO_ONLY"], - "subscriberDtlsPassiveMode": ["SUBSCRIBER_DTLS_PASSIVE_MODE_DISABLED"], - "publisherOpusDred": ["PUBLISHER_OPUS_DRED_DISABLED"], - "publisherOpusLowBitrate": ["PUBLISHER_OPUS_LOW_BITRATE_DISABLED"], - "sdkAndroidDestroySessionOnTaskRemoved": ["SDK_ANDROID_DESTROY_SESSION_ON_TASK_REMOVED_DISABLED"], - "svcModes": ["FALSE"], - "reportTelemetryModes": ["TRUE"], - "keepDefaultDevicesModes": ["FALSE"] - }, - "sdkInfo": { - "implementation": "browser", - "version": "5.27.0", - "userAgent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0", - "hwConcurrency": 24 - }, - "sdkInitializationId": gen_uid(), - "disablePublisher": not is_sender, - "disableSubscriber": False, - "disableSubscriberAudio": True - } - } - - await send(hello) - print(f" -> [{name}] hello sent") - - pub_sdp_sent = False - - async def ws_loop(): - nonlocal pub_sdp_sent - - try: - async for raw in ws: - msg = json.loads(raw) - keys = [k for k in msg if k != "uid"] - if not keys: - continue - mtype = keys[0] - uid = msg.get("uid", "") - print(f" -> [{name}] << {mtype}") - - if mtype == "ack": - pass - - elif mtype == "serverHello": - sh = msg["serverHello"] - raw_ice = sh.get("rtcConfiguration", {}).get("iceServers", []) - if raw_ice: - ice = make_ice_servers(raw_ice) - old_sub = pc_sub_ref[0] - old_pub = pc_pub_ref[0] - pc_sub_ref[0] = RTCPeerConnection(RTCConfiguration(iceServers=ice)) - pc_pub_ref[0] = RTCPeerConnection(RTCConfiguration(iceServers=ice)) - if is_sender and video_track: - pc_pub_ref[0].addTrack(video_track) - setup_pc(pc_sub_ref[0], pc_pub_ref[0]) - await old_sub.close() - await old_pub.close() - print(f" -> [{name}] PC recreated with TURN") - await ack(uid) - - elif mtype == "subscriberSdpOffer": - offer_sdp = msg["subscriberSdpOffer"]["sdp"] - pc_seq = msg["subscriberSdpOffer"]["pcSeq"] - pc_sub = pc_sub_ref[0] - pc_pub = pc_pub_ref[0] - - await pc_sub.setRemoteDescription( - RTCSessionDescription(sdp=offer_sdp, type="offer") - ) - answer = await pc_sub.createAnswer() - await pc_sub.setLocalDescription(answer) - - await send({ - "uid": gen_uid(), - "subscriberSdpAnswer": { - "pcSeq": pc_seq, - "sdp": pc_sub.localDescription.sdp - } - }) - print(f" -> [{name}] >> subscriberSdpAnswer") - await ack(uid) - - if not is_sender: - await send({ - "uid": gen_uid(), - "setSlots": { - "slots": [ - {"width": 1280, "height": 720}, - {"width": 640, "height": 360} - ], - "audioSlotsCount": 0, - "key": 1, - "shutdownAllVideo": None, - "withSelfView": False, - "selfViewVisibility": "ON_LOADING_THEN_SHOW", - "gridConfig": {} - } - }) - print(f" -> [{name}] >> setSlots (запросили маршрутизацию видео у сервера!)") - - if is_sender and not pub_sdp_sent: - await asyncio.sleep(0.3) - pub_offer = await pc_pub.createOffer() - await pc_pub.setLocalDescription(pub_offer) - tracks_info = [] - for t in pc_pub.getTransceivers(): - if t.sender.track: - tracks_info.append({ - "mid": t.mid, - "transceiverMid": t.mid, - "kind": t.sender.track.kind.upper(), - "priority": 0, - "label": "QRVideoTrack", - "codecs": {}, - "groupId": 1, - "description": "" - }) - await send({ - "uid": gen_uid(), - "publisherSdpOffer": { - "pcSeq": 1, - "sdp": pc_pub.localDescription.sdp, - "tracks": tracks_info - } - }) - pub_sdp_sent = True - print(f" -> [{name}] >> publisherSdpOffer") - - elif mtype == "publisherSdpAnswer": - await pc_pub_ref[0].setRemoteDescription( - RTCSessionDescription(sdp=msg["publisherSdpAnswer"]["sdp"], type="answer") - ) - print(f" -> [{name}] publisher answer set") - await ack(uid) - - elif mtype == "webrtcIceCandidate": - cand = msg["webrtcIceCandidate"] - candidate_str = cand.get("candidate", "") - target = cand.get("target", "") - sdp_mid = cand.get("sdpMid", "0") - sdp_mline = cand.get("sdpMlineIndex", 0) - - if not candidate_str: - continue - parts = candidate_str.split() - if len(parts) < 8: - continue - - try: - tcptype = None - if "tcptype" in parts: - ti = parts.index("tcptype") - tcptype = parts[ti + 1] - - ice_c = RTCIceCandidate( - component=int(parts[1]), - foundation=parts[0].replace("candidate:", ""), - ip=parts[4], - port=int(parts[5]), - priority=int(parts[3]), - protocol=parts[2].lower(), - type=parts[7], - tcpType=tcptype, - sdpMid=sdp_mid, - sdpMLineIndex=sdp_mline - ) - if target == "SUBSCRIBER": - await pc_sub_ref[0].addIceCandidate(ice_c) - print(f" -> [{name}] sub ICE: {parts[2]} {parts[4]}:{parts[5]}") - elif target == "PUBLISHER": - await pc_pub_ref[0].addIceCandidate(ice_c) - print(f" -> [{name}] pub ICE: {parts[2]} {parts[4]}:{parts[5]}") - except Exception as e: - print(f" -> [{name}] ICE err: {e}") - - elif mtype in ("setSlots", "slotsConfig", "slotsMeta", "vadActivity", - "updateDescription", "upsertDescription", "sdkCodecsInfo", - "pingPong", "selfQualityReport", "upsertParticipantsQualityReport"): - await ack(uid) - - else: - print(f" -> [{name}] unhandled: {mtype}") - if uid: - await ack(uid) - - except websockets.exceptions.ConnectionClosed as e: - print(f" -> [{name}] WS closed: {e}") - except Exception as e: - import traceback - print(f" -> [{name}] WS err: {e}") - traceback.print_exc() - - ws_task = asyncio.create_task(ws_loop()) - - return { - "name": name, - "ws": ws, - "ws_task": ws_task, - "pc_pub_ref": pc_pub_ref, - "pc_sub_ref": pc_sub_ref, - "video_track": video_track, - "receiver": receiver_obj, - "track_tasks": track_tasks - } - - -async def run(): - print(""" - VCSend - Video QR Transfer - Request/Response over Yandex Telemost SFU - by zowue for olc -""") - - print("[0/3] Getting conference info...") - sender_conn = get_connection_info("QR_Sender") - receiver_conn = get_connection_info("QR_Receiver") - print(f" -> sender room: {sender_conn['room_id']}") - print(f" -> receiver room: {receiver_conn['room_id']}") - - print("\n[1/3] Connecting sender...") - sender = await connect_peer("QR_Sender", sender_conn) - await asyncio.sleep(5) - - print("\n[2/3] Connecting receiver...") - receiver = await connect_peer("QR_Receiver", receiver_conn) - await asyncio.sleep(5) - - print("\n[3/3] Transfer...") - url = "zarazaex.xyz/curl.txt" - if not url.startswith("http"): - url = "https://" + url - - print(f" -> fetching {url}") - resp = requests.get(url, timeout=10) - resp.raise_for_status() - data = resp.content - print(f" -> got {len(data)} bytes") - - tid = gen_uid() - chunks = chunk_data(data, tid) - sender["video_track"].set_data(chunks) - print(f" -> {len(chunks)} QR frames set, waiting for decode...") - - for i in range(300): - await asyncio.sleep(1) - if i % 15 == 0: - print(f" -> {i}s elapsed") - if receiver["receiver"] and receiver["receiver"].result is not None: - result = receiver["receiver"].result - print(f"\n :P got {len(result)} bytes\n") - print("--- content ---") - try: - print(result.decode("utf-8")) - except Exception: - print(f"[binary {len(result)} bytes]") - print("--- end ---\n") - break - else: - print(" X timeout — check /tmp/qr_recv_*.png") - - for p in [sender, receiver]: - p["ws_task"].cancel() - try: - await p["ws"].close() - except Exception: - pass - try: - await p["pc_pub_ref"][0].close() - await p["pc_sub_ref"][0].close() - except Exception: - pass - - -if __name__ == "__main__": - try: - asyncio.run(run()) - except KeyboardInterrupt: - print("\ninterrupted") \ No newline at end of file