diff --git a/code/limits.py b/code/limits.py new file mode 100644 index 0000000..f57d054 --- /dev/null +++ b/code/limits.py @@ -0,0 +1,1054 @@ +#!/usr/bin/env python3 + +import asyncio +import json +import uuid +import websockets +import requests +from urllib.parse import quote +from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceCandidate, RTCConfiguration, RTCIceServer + +CHUNK_SIZE = 7168 +HEADER_SIZE = 1024 + +class ChunkedReceiver: + def __init__(self): + self.buffers = {} + self.completed = {} + + def handle_chunk(self, packet): + if len(packet) < HEADER_SIZE: + return None + + header_bytes = packet[:HEADER_SIZE].rstrip(b'\x00') + chunk_data = packet[HEADER_SIZE:] + + try: + header = json.loads(header_bytes) + except: + return None + + tid = header["tid"] + idx = header["idx"] + total = header["total"] + + if tid not in self.buffers: + self.buffers[tid] = {"chunks": {}, "total": total, "received": 0} + + buf = self.buffers[tid] + + if idx not in buf["chunks"]: + buf["chunks"][idx] = chunk_data + buf["received"] += 1 + + if buf["received"] == buf["total"]: + complete = b"".join(buf["chunks"][i] for i in range(buf["total"])) + self.completed[tid] = complete + del self.buffers[tid] + return tid + + return None + +def chunk_data(data, transfer_id): + total_size = len(data) + chunk_count = (total_size + CHUNK_SIZE - 1) // CHUNK_SIZE + packets = [] + + for i in range(chunk_count): + start = i * CHUNK_SIZE + end = min(start + CHUNK_SIZE, total_size) + chunk = data[start:end] + + header = {"tid": transfer_id, "idx": i, "total": chunk_count, "size": total_size} + header_json = json.dumps(header).encode() + header_padded = header_json.ljust(HEADER_SIZE, b'\x00') + + packets.append(header_padded + chunk) + + return packets + +CONFERENCE_ID = "33734896687006" +CONFERENCE_URL = f"https://telemost.yandex.ru/j/{CONFERENCE_ID}" +API_BASE = "https://cloud-api.yandex.ru/telemost_front/v2/telemost" + +def generate_uuid(): + return str(uuid.uuid4()) + +def get_connection_info(display_name): + url = f"{API_BASE}/conferences/{quote(CONFERENCE_URL, safe='')}/connection" + params = { + "next_gen_media_platform_allowed": "true", + "display_name": display_name, + "waiting_room_supported": "true" + } + + headers = { + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0", + "Accept": "*/*", + "content-type": "application/json", + "Client-Instance-Id": generate_uuid(), + "X-Telemost-Client-Version": "187.1.0", + "idempotency-key": generate_uuid(), + "Origin": "https://telemost.yandex.ru", + "Referer": "https://telemost.yandex.ru/" + } + + response = requests.get(url, params=params, headers=headers) + response.raise_for_status() + return response.json() + +async def check_datachannel_limits(): + limits = { + "supported": False, + "sctp_port": None, + "max_message_size": None, + "max_message_size_mb": None, + "ordered_supported": None, + "protocol": None + } + + try: + conn_info = get_connection_info("LimitCheck") + ws_url = conn_info["client_configuration"]["media_server_url"] + + async with websockets.connect(ws_url) as ws: + hello = { + "uid": generate_uuid(), + "hello": { + "participantMeta": {"name": "LimitCheck", "role": "SPEAKER", "sendAudio": False, "sendVideo": False}, + "participantAttributes": {"name": "LimitCheck", "role": "SPEAKER"}, + "sendAudio": False, "sendVideo": False, "sendSharing": False, + "participantId": conn_info["peer_id"], + "roomId": conn_info["room_id"], + "serviceName": "telemost", + "credentials": conn_info["credentials"], + "capabilitiesOffer": {"offerAnswerMode": ["SEPARATE"], "initialSubscriberOffer": ["ON_HELLO"]}, + "sdkInfo": {"implementation": "python", "version": "1.0.0"}, + "sdkInitializationId": generate_uuid(), + "disablePublisher": False, "disableSubscriber": False + } + } + await ws.send(json.dumps(hello)) + + for _ in range(10): + data = json.loads(await ws.recv()) + if "serverHello" in data: + await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) + if "subscriberSdpOffer" in data: + sdp = data["subscriberSdpOffer"]["sdp"] + if "m=application" in sdp and "SCTP" in sdp: + limits["supported"] = True + limits["protocol"] = "SCTP over DTLS" + for line in sdp.split('\r\n'): + if 'sctp-port:' in line: + limits["sctp_port"] = int(line.split(':')[1].strip()) + if 'max-message-size:' in line: + size = int(line.split(':')[1].strip()) + limits["max_message_size"] = size + limits["max_message_size_mb"] = size / 1024 / 1024 + limits["ordered_supported"] = True + break + except Exception as e: + limits["error"] = str(e) + + return limits + +async def check_conference_limits(): + limits = { + "max_participants": None, + "session_timeout_ms": None, + "session_timeout_min": None, + "reconnect_wait_ms": None, + "media_platform": None, + "connection_type": None + } + + try: + conn_info = get_connection_info("ConfCheck") + limits["max_participants"] = conn_info.get("conference_limit") + limits["connection_type"] = conn_info.get("connection_type") + limits["media_platform"] = conn_info.get("media_platform") + + client_config = conn_info.get("client_configuration", {}) + limits["session_timeout_ms"] = client_config.get("goloom_session_open_ms") + if limits["session_timeout_ms"]: + limits["session_timeout_min"] = limits["session_timeout_ms"] / 1000 / 60 + limits["reconnect_wait_ms"] = client_config.get("wait_time_to_reconnect_ms") + except Exception as e: + limits["error"] = str(e) + + return limits + +async def check_audio_limits(): + limits = { + "codecs": [], + "opus_config": {}, + "red_supported": False + } + + try: + conn_info = get_connection_info("AudioCheck") + ws_url = conn_info["client_configuration"]["media_server_url"] + + async with websockets.connect(ws_url) as ws: + hello = { + "uid": generate_uuid(), + "hello": { + "participantMeta": {"name": "AudioCheck", "role": "SPEAKER", "sendAudio": False, "sendVideo": False}, + "participantAttributes": {"name": "AudioCheck", "role": "SPEAKER"}, + "sendAudio": False, "sendVideo": False, "sendSharing": False, + "participantId": conn_info["peer_id"], + "roomId": conn_info["room_id"], + "serviceName": "telemost", + "credentials": conn_info["credentials"], + "capabilitiesOffer": {"offerAnswerMode": ["SEPARATE"], "initialSubscriberOffer": ["ON_HELLO"]}, + "sdkInfo": {"implementation": "python", "version": "1.0.0"}, + "sdkInitializationId": generate_uuid(), + "disablePublisher": False, "disableSubscriber": False + } + } + await ws.send(json.dumps(hello)) + + for _ in range(10): + data = json.loads(await ws.recv()) + if "serverHello" in data: + await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) + if "subscriberSdpOffer" in data: + sdp = data["subscriberSdpOffer"]["sdp"] + for line in sdp.split('\r\n'): + if line.startswith('a=rtpmap:') and 'opus' in line.lower(): + parts = line.split() + codec_info = parts[1].split('/') + limits["codecs"].append({ + "name": codec_info[0], + "rate": int(codec_info[1]) if len(codec_info) > 1 else None, + "channels": int(codec_info[2]) if len(codec_info) > 2 else None + }) + if line.startswith('a=fmtp:') and 'opus' in sdp[max(0, sdp.find(line)-200):sdp.find(line)].lower(): + params = line.split(':', 1)[1].strip().split(';') + for param in params: + if '=' in param: + key, val = param.strip().split('=') + limits["opus_config"][key] = val + if 'red' in line.lower() and 'rtpmap' in line: + limits["red_supported"] = True + break + except Exception as e: + limits["error"] = str(e) + + return limits + +async def check_video_limits(): + limits = { + "codecs": [], + "simulcast_supported": False, + "bandwidth_layers": {} + } + + try: + conn_info = get_connection_info("VideoCheck") + ws_url = conn_info["client_configuration"]["media_server_url"] + + async with websockets.connect(ws_url) as ws: + hello = { + "uid": generate_uuid(), + "hello": { + "participantMeta": {"name": "VideoCheck", "role": "SPEAKER", "sendAudio": False, "sendVideo": False}, + "participantAttributes": {"name": "VideoCheck", "role": "SPEAKER"}, + "sendAudio": False, "sendVideo": False, "sendSharing": False, + "participantId": conn_info["peer_id"], + "roomId": conn_info["room_id"], + "serviceName": "telemost", + "credentials": conn_info["credentials"], + "capabilitiesOffer": { + "offerAnswerMode": ["SEPARATE"], + "initialSubscriberOffer": ["ON_HELLO"], + "simulcastMode": ["STATIC"] + }, + "sdkInfo": {"implementation": "python", "version": "1.0.0"}, + "sdkInitializationId": generate_uuid(), + "disablePublisher": False, "disableSubscriber": False + } + } + await ws.send(json.dumps(hello)) + + for _ in range(10): + data = json.loads(await ws.recv()) + if "serverHello" in data: + caps = data["serverHello"].get("capabilitiesAnswer", {}) + limits["simulcast_supported"] = caps.get("simulcastMode") in ["STATIC", "DYNAMIC"] + await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) + if "subscriberSdpOffer" in data: + sdp = data["subscriberSdpOffer"]["sdp"] + codec_names = set() + for line in sdp.split('\r\n'): + if line.startswith('a=rtpmap:') and 'm=video' in sdp[:sdp.find(line)]: + parts = line.split() + if len(parts) >= 2: + codec_info = parts[1].split('/') + codec_name = codec_info[0].upper() + if codec_name not in codec_names and codec_name not in ['RTX', 'RED', 'ULPFEC']: + codec_names.add(codec_name) + limits["codecs"].append(codec_name) + break + + limits["bandwidth_layers"] = { + "L1": {"single": "1000 kbps"}, + "L2": {"low": "120 kbps", "med": "360 kbps"}, + "L3": {"low": "120 kbps", "med": "360 kbps", "hi": "800 kbps"}, + "L4": {"low": "120 kbps", "med": "360 kbps", "hi": "800 kbps", "ultra": "1000 kbps"} + } + except Exception as e: + limits["error"] = str(e) + + return limits + +async def check_ice_limits(): + limits = { + "stun_servers": [], + "turn_servers": [], + "ping_interval_ms": None, + "ack_timeout_ms": None + } + + try: + conn_info = get_connection_info("ICECheck") + ws_url = conn_info["client_configuration"]["media_server_url"] + + ice_servers = conn_info["client_configuration"].get("ice_servers", []) + for server in ice_servers: + urls = server.get("urls", []) + for url in urls: + if url.startswith("stun:"): + limits["stun_servers"].append(url) + elif url.startswith("turn:"): + limits["turn_servers"].append(url) + + async with websockets.connect(ws_url) as ws: + hello = { + "uid": generate_uuid(), + "hello": { + "participantMeta": {"name": "ICECheck", "role": "SPEAKER", "sendAudio": False, "sendVideo": False}, + "participantAttributes": {"name": "ICECheck", "role": "SPEAKER"}, + "sendAudio": False, "sendVideo": False, "sendSharing": False, + "participantId": conn_info["peer_id"], + "roomId": conn_info["room_id"], + "serviceName": "telemost", + "credentials": conn_info["credentials"], + "capabilitiesOffer": {"offerAnswerMode": ["SEPARATE"], "initialSubscriberOffer": ["ON_HELLO"]}, + "sdkInfo": {"implementation": "python", "version": "1.0.0"}, + "sdkInitializationId": generate_uuid(), + "disablePublisher": False, "disableSubscriber": False + } + } + await ws.send(json.dumps(hello)) + + for _ in range(10): + data = json.loads(await ws.recv()) + if "serverHello" in data: + ping_config = data["serverHello"].get("pingPongConfiguration", {}) + limits["ping_interval_ms"] = ping_config.get("pingInterval") + limits["ack_timeout_ms"] = ping_config.get("ackTimeout") + + rtc_config = data["serverHello"].get("rtcConfiguration", {}) + ice_servers_full = rtc_config.get("iceServers", []) + for server in ice_servers_full: + urls = server.get("urls", []) + for url in urls: + if url.startswith("stun:") and url not in limits["stun_servers"]: + limits["stun_servers"].append(url) + elif url.startswith("turn:") and url not in limits["turn_servers"]: + limits["turn_servers"].append(url) + break + except Exception as e: + limits["error"] = str(e) + + return limits + +async def create_test_peer(name, is_server=False, sctp_tweaks=None): + conn_info = get_connection_info(name) + room_id = conn_info["room_id"] + peer_id = conn_info["peer_id"] + credentials = conn_info["credentials"] + ws_url = conn_info["client_configuration"]["media_server_url"] + + pc_sub = RTCPeerConnection(RTCConfiguration( + iceServers=[RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"])] + )) + + pc_pub = RTCPeerConnection(RTCConfiguration( + iceServers=[RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"])] + )) + + dc_pub = pc_pub.createDataChannel("limits_test", ordered=True) + dc_pub_open = asyncio.Event() + + stats = { + "sent": 0, + "received": 0, + "bytes_sent": 0, + "bytes_received": 0, + "errors": [], + "last_received": None, + "events": [] + } + + @dc_pub.on("open") + def on_pub_open(): + stats["events"].append(f"[{name}] Publisher DC opened") + dc_pub_open.set() + + @dc_pub.on("message") + def on_pub_msg(msg): + stats["events"].append(f"[{name}] Publisher DC received {len(msg)}b") + stats["received"] += 1 + stats["bytes_received"] += len(msg) + stats["last_received"] = len(msg) + + @dc_pub.on("error") + def on_pub_error(error): + stats["events"].append(f"[{name}] Publisher DC error: {error}") + stats["errors"].append(f"Pub DC error: {error}") + + @pc_sub.on("datachannel") + def on_sub_dc(channel): + stats["events"].append(f"[{name}] Subscriber DC created: {channel.label}") + + @channel.on("open") + def on_sub_open(): + stats["events"].append(f"[{name}] Subscriber DC opened") + + @channel.on("message") + def on_message(message): + stats["events"].append(f"[{name}] Subscriber DC received {len(message)}b") + stats["received"] += 1 + stats["bytes_received"] += len(message) + stats["last_received"] = len(message) + + if is_server: + try: + stats["events"].append(f"[{name}] Echoing {len(message)}b back...") + dc_pub.send(message) + stats["sent"] += 1 + stats["bytes_sent"] += len(message) + stats["events"].append(f"[{name}] Echo sent successfully") + except Exception as e: + stats["events"].append(f"[{name}] Echo failed: {e}") + stats["errors"].append(f"Send error: {str(e)}") + + @channel.on("error") + def on_sub_error(error): + stats["events"].append(f"[{name}] Subscriber DC error: {error}") + stats["errors"].append(f"Sub DC error: {error}") + + ws = await websockets.connect(ws_url) + + hello_msg = { + "uid": generate_uuid(), + "hello": { + "participantMeta": {"name": name, "role": "SPEAKER", "sendAudio": False, "sendVideo": False}, + "participantAttributes": {"name": name, "role": "SPEAKER"}, + "sendAudio": False, + "sendVideo": False, + "sendSharing": False, + "participantId": peer_id, + "roomId": room_id, + "serviceName": "telemost", + "credentials": credentials, + "capabilitiesOffer": { + "offerAnswerMode": ["SEPARATE"], + "initialSubscriberOffer": ["ON_HELLO"], + "slotsMode": ["FROM_CONTROLLER"], + "simulcastMode": ["DISABLED"], + "selfVadStatus": ["FROM_SERVER"], + "dataChannelSharing": ["TO_RTP"] + }, + "sdkInfo": {"implementation": "python", "version": "1.0.0", "userAgent": f"LimitsTest-{name}"}, + "sdkInitializationId": generate_uuid(), + "disablePublisher": False, + "disableSubscriber": False + } + } + + await ws.send(json.dumps(hello_msg)) + + publisher_sdp_sent = False + + async def ws_handler(): + nonlocal publisher_sdp_sent + while True: + try: + data = json.loads(await ws.recv()) + + if "serverHello" in data: + await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) + + if "subscriberSdpOffer" in data and not publisher_sdp_sent: + await pc_sub.setRemoteDescription(RTCSessionDescription( + sdp=data["subscriberSdpOffer"]["sdp"], type="offer" + )) + + answer = await pc_sub.createAnswer() + await pc_sub.setLocalDescription(answer) + + await ws.send(json.dumps({ + "uid": generate_uuid(), + "subscriberSdpAnswer": { + "pcSeq": data["subscriberSdpOffer"]["pcSeq"], + "sdp": pc_sub.localDescription.sdp + } + })) + + await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) + await asyncio.sleep(0.3) + + pub_offer = await pc_pub.createOffer() + await pc_pub.setLocalDescription(pub_offer) + + sdp_modified = pc_pub.localDescription.sdp + if sctp_tweaks: + if 'max_message_size' in sctp_tweaks: + sdp_modified = sdp_modified.replace( + 'a=max-message-size:', + f'a=max-message-size:{sctp_tweaks["max_message_size"]}\r\na=old-max-message-size:' + ) + if 'sctp_buf' in sctp_tweaks: + if 'a=sctp-port:' in sdp_modified: + sdp_modified = sdp_modified.replace( + 'a=sctp-port:', + f'a=sctpmap:5000 webrtc-datachannel {sctp_tweaks["sctp_buf"]}\r\na=sctp-port:' + ) + + await ws.send(json.dumps({ + "uid": generate_uuid(), + "publisherSdpOffer": { + "pcSeq": 1, + "sdp": sdp_modified + } + })) + publisher_sdp_sent = True + + if "publisherSdpAnswer" in data: + await pc_pub.setRemoteDescription(RTCSessionDescription( + sdp=data["publisherSdpAnswer"]["sdp"], type="answer" + )) + await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) + + if "webrtcIceCandidate" in data: + cand = data["webrtcIceCandidate"] + try: + parts = cand["candidate"].split() + if len(parts) >= 8: + ice = RTCIceCandidate( + component=int(parts[1]), + foundation=parts[0].replace("candidate:", ""), + ip=parts[4], + port=int(parts[5]), + priority=int(parts[3]), + protocol=parts[2], + type=parts[7], + sdpMid=cand["sdpMid"], + sdpMLineIndex=cand["sdpMlineIndex"] + ) + + if cand.get("target") == "SUBSCRIBER": + await pc_sub.addIceCandidate(ice) + elif cand.get("target") == "PUBLISHER": + await pc_pub.addIceCandidate(ice) + except: + pass + + except: + break + + @pc_sub.on("icecandidate") + async def on_sub_ice(event): + if event.candidate: + await ws.send(json.dumps({ + "uid": generate_uuid(), + "webrtcIceCandidate": { + "candidate": event.candidate.candidate, + "sdpMid": event.candidate.sdpMid, + "sdpMlineIndex": event.candidate.sdpMLineIndex, + "target": "SUBSCRIBER", + "pcSeq": 1 + } + })) + + @pc_pub.on("icecandidate") + async def on_pub_ice(event): + if event.candidate: + await ws.send(json.dumps({ + "uid": generate_uuid(), + "webrtcIceCandidate": { + "candidate": event.candidate.candidate, + "sdpMid": event.candidate.sdpMid, + "sdpMlineIndex": event.candidate.sdpMLineIndex, + "target": "PUBLISHER", + "pcSeq": 1 + } + })) + + ws_task = asyncio.create_task(ws_handler()) + + return { + "name": name, + "dc_pub": dc_pub, + "dc_pub_open": dc_pub_open, + "stats": stats, + "ws": ws, + "ws_task": ws_task, + "pc_sub": pc_sub, + "pc_pub": pc_pub + } + +async def test_message_size_limits(max_size): + print("\n[REAL TEST] Testing message size limits...") + + try: + server = await create_test_peer("LimitServer", is_server=True) + await asyncio.wait_for(server["dc_pub_open"].wait(), timeout=10.0) + print(" :P Server ready") + + client = await create_test_peer("LimitClient", is_server=False) + await asyncio.wait_for(client["dc_pub_open"].wait(), timeout=10.0) + print(" :P Client ready") + + await asyncio.sleep(3) + + sizes_kb = [1, 6, 8, 10, 12, 14, 16, 18, 20, 24, 28, 32] + tests = [(f"{kb}KB", kb * 1024, 3, 2) for kb in sizes_kb] + + results = [] + last_success_size = 0 + + for test_name, size, send_wait, echo_wait in tests: + client["stats"]["last_received"] = None + server["stats"]["last_received"] = None + client["stats"]["events"].clear() + server["stats"]["events"].clear() + + try: + print(f"\n -> Sending {test_name}...") + + dc = client["dc_pub"] + print(f" -> DC state: {dc.readyState}") + print(f" -> DC bufferedAmount before: {dc.bufferedAmount}") + + data = "X" * size + + print(f" -> Calling send() for {size:,} bytes...") + try: + dc.send(data) + client["stats"]["sent"] += 1 + client["stats"]["bytes_sent"] += len(data) + print(f" -> send() returned successfully") + except Exception as send_err: + print(f" X send() raised exception: {send_err}") + results.append((test_name, False, f"Send exception: {send_err}")) + continue + + print(f" -> DC bufferedAmount after: {dc.bufferedAmount}") + print(f" -> Waiting {send_wait}s for delivery...") + + for i in range(send_wait): + await asyncio.sleep(1) + print(f" -> {i+1}s... buffered: {dc.bufferedAmount}, server events: {len(server['stats']['events'])}") + if server["stats"]["last_received"]: + print(f" -> Server received after {i+1}s!") + break + + server_received = server["stats"]["last_received"] + + if server_received: + print(f" -> Server got {server_received} bytes, waiting {echo_wait}s for echo...") + for i in range(echo_wait): + await asyncio.sleep(1) + if client["stats"]["last_received"]: + print(f" -> Client received echo after {i+1}s!") + break + print(f" -> {i+1}s... (client: {len(client['stats']['events'])} events)") + else: + print(f" -> Server: NO DATA") + print(f" -> Final bufferedAmount: {dc.bufferedAmount}") + + client_received = client["stats"]["last_received"] + + if client_received: + print(f" :P {test_name} ({size:,}b): SUCCESS") + results.append((test_name, True, None)) + last_success_size = size + elif server_received: + if server["stats"]["errors"]: + print(f" X {test_name}: Echo failed - {server['stats']['errors'][-1]}") + results.append((test_name, False, server['stats']['errors'][-1])) + else: + print(f" X {test_name}: Server got it, but no echo back") + results.append((test_name, False, "No echo received")) + else: + print(f" X {test_name}: Never reached server") + results.append((test_name, False, "Never reached server")) + break + + except Exception as e: + print(f" X {test_name} ({size:,}b): TEST FAILED - {str(e)}") + results.append((test_name, False, f"Test error: {str(e)[:30]}")) + break + + if last_success_size > 0: + print(f"\n :P Max working size: {last_success_size:,} bytes ({last_success_size/1024:.0f}KB)") + + server["ws_task"].cancel() + client["ws_task"].cancel() + await server["ws"].close() + await client["ws"].close() + await server["pc_sub"].close() + await server["pc_pub"].close() + await client["pc_sub"].close() + await client["pc_pub"].close() + + return results + + except Exception as e: + print(f" X Standard test failed: {e}") + return [] + +async def test_throughput_limits(): + print("\n[REAL TEST] Testing throughput limits...") + + try: + server = await create_test_peer("ThroughputServer", is_server=True) + await asyncio.wait_for(server["dc_pub_open"].wait(), timeout=10.0) + + client = await create_test_peer("ThroughputClient", is_server=False) + await asyncio.wait_for(client["dc_pub_open"].wait(), timeout=10.0) + print(" :P Peers ready") + + await asyncio.sleep(3) + + msg_size = 1024 + msg_count = 50 + data = "X" * msg_size + + start_time = asyncio.get_event_loop().time() + + for i in range(msg_count): + try: + client["dc_pub"].send(data) + client["stats"]["sent"] += 1 + client["stats"]["bytes_sent"] += len(data) + await asyncio.sleep(0.05) + except Exception as e: + print(f" X Send failed at message {i}: {e}") + break + + await asyncio.sleep(3) + + end_time = asyncio.get_event_loop().time() + duration = end_time - start_time + + total_bytes = client["stats"]["bytes_sent"] + throughput_kbps = (total_bytes * 8) / (duration * 1_000) + + print(f" :P Sent: {client['stats']['sent']} messages") + print(f" :P Received: {server['stats']['received']} messages") + print(f" :P Throughput: {throughput_kbps:.2f} Kbps") + print(f" :P Duration: {duration:.2f}s") + + server["ws_task"].cancel() + client["ws_task"].cancel() + await server["ws"].close() + await client["ws"].close() + await server["pc_sub"].close() + await server["pc_pub"].close() + await client["pc_sub"].close() + await client["pc_pub"].close() + + success = server['stats']['received'] > 0 + return [("Throughput test", success, f"{throughput_kbps:.2f} Kbps" if success else "No response")] + + except Exception as e: + print(f" X Test failed: {e}") + return [("Throughput test", False, str(e))] + +async def test_chunked_transfer(): + print("\n[REAL TEST] Testing large data transfer (app-level chunking)...") + + try: + server = await create_test_peer("BigServer", is_server=True) + await asyncio.wait_for(server["dc_pub_open"].wait(), timeout=10.0) + + client = await create_test_peer("BigClient", is_server=False) + await asyncio.wait_for(client["dc_pub_open"].wait(), timeout=10.0) + print(" :P Peers ready") + + await asyncio.sleep(3) + + test_sizes = [ + ("64KB", 64 * 1024), + ("128KB", 128 * 1024), + ("256KB", 256 * 1024), + ] + + results = [] + chunk_size = 8192 + + for name, total_size in test_sizes: + server["stats"]["bytes_received"] = 0 + server["stats"]["received"] = 0 + + chunks_needed = (total_size + chunk_size - 1) // chunk_size + + print(f"\n -> Sending {name} ({total_size:,}b) as {chunks_needed} x 8KB chunks...") + + start_time = asyncio.get_event_loop().time() + + try: + for i in range(chunks_needed): + chunk = ("X" * chunk_size) + + while client["dc_pub"].bufferedAmount > chunk_size * 2: + await asyncio.sleep(0.001) + + client["dc_pub"].send(chunk) + + await asyncio.sleep(2) + + end_time = asyncio.get_event_loop().time() + duration_ms = (end_time - start_time) * 1000 + + received = server["stats"]["bytes_received"] + + if received >= total_size: + throughput_kbps = (total_size * 8) / duration_ms + print(f" :P {name}: SUCCESS in {duration_ms:.0f}ms ({throughput_kbps:.1f} Kbps)") + results.append((name, True, f"{duration_ms:.0f}ms")) + else: + print(f" X {name}: PARTIAL ({received:,}b / {total_size:,}b)") + results.append((name, False, f"{received}b")) + break + + except Exception as e: + print(f" X {name}: ERROR - {e}") + results.append((name, False, str(e)[:30])) + break + + server["ws_task"].cancel() + client["ws_task"].cancel() + await server["ws"].close() + await client["ws"].close() + await server["pc_sub"].close() + await server["pc_pub"].close() + await client["pc_sub"].close() + await client["pc_pub"].close() + + return results + + except Exception as e: + print(f" X Test failed: {e}") + return [("Large transfer", False, str(e))] + +async def test_latency_microbench(): + print("\n[REAL TEST] Testing latency microbenchmarks...") + + try: + server = await create_test_peer("LatencyServer", is_server=True) + await asyncio.wait_for(server["dc_pub_open"].wait(), timeout=10.0) + + client = await create_test_peer("LatencyClient", is_server=False) + await asyncio.wait_for(client["dc_pub_open"].wait(), timeout=10.0) + print(" :P Latency test peers ready") + + await asyncio.sleep(3) + + test_cases = [ + ("Tiny (100b)", 100), + ("Small (1KB)", 1024), + ("Medium (4KB)", 4096), + ("Large (8KB)", 8192), + ] + + results = [] + + for name, size in test_cases: + latencies = [] + data = "L" * size + + print(f"\n -> Testing {name}...") + + for i in range(10): + client["stats"]["last_received"] = None + server["stats"]["last_received"] = None + + send_time = asyncio.get_event_loop().time() + client["dc_pub"].send(data) + + for _ in range(50): + await asyncio.sleep(0.01) + if client["stats"]["last_received"]: + recv_time = asyncio.get_event_loop().time() + rtt_us = (recv_time - send_time) * 1_000_000 + latencies.append(rtt_us) + break + + if latencies: + avg_lat = sum(latencies) / len(latencies) + min_lat = min(latencies) + max_lat = max(latencies) + + print(f" :P {name}: avg={avg_lat:.0f}µs, min={min_lat:.0f}µs, max={max_lat:.0f}µs") + results.append((name, True, f"{avg_lat:.0f}µs")) + else: + print(f" X {name}: No responses") + results.append((name, False, "No response")) + + server["ws_task"].cancel() + client["ws_task"].cancel() + await server["ws"].close() + await client["ws"].close() + await server["pc_sub"].close() + await server["pc_pub"].close() + await client["pc_sub"].close() + await client["pc_pub"].close() + + return results + + except Exception as e: + print(f" X Latency test failed: {e}") + return [("Latency test", False, str(e))] + +async def test_participant_limit(max_participants): + print("\n[REAL TEST] Testing participant limit...") + + peers = [] + try: + for i in range(min(3, max_participants)): + try: + peer = await create_test_peer(f"Peer{i+1}", is_server=False) + await asyncio.wait_for(peer["dc_pub_open"].wait(), timeout=10.0) + peers.append(peer) + print(f" :P Peer {i+1}/3 connected") + await asyncio.sleep(1) + except Exception as e: + print(f" X Peer {i+1} failed: {str(e)[:30]}") + break + + print(f" :P Successfully connected {len(peers)}/{min(3, max_participants)} peers") + + for peer in peers: + peer["ws_task"].cancel() + await peer["ws"].close() + await peer["pc_sub"].close() + await peer["pc_pub"].close() + + return [("Multi-peer test", True, f"{len(peers)} peers")] + + except Exception as e: + print(f" X Test failed: {e}") + return [("Multi-peer test", False, str(e))] + +async def check_all_limits(): + print(r""" + Yandex Telemost - Limits Check + Full verification of documented limits + by zowue for olc +""") + + print("[1/5] Checking DataChannel limits...") + dc_limits = await check_datachannel_limits() + if dc_limits['supported']: + print(f" :P DataChannel: SUPPORTED") + print(f" :P Protocol: {dc_limits['protocol']}") + print(f" :P SCTP Port: {dc_limits['sctp_port']}") + print(f" :P Max Message: {dc_limits['max_message_size_mb']:.0f}MB") + print(f" :P Ordered: YES") + else: + print(f" X DataChannel: NOT SUPPORTED") + + print("\n[2/5] Checking conference limits...") + conf_limits = await check_conference_limits() + print(f" :P Max Participants: {conf_limits['max_participants']}") + print(f" :P Media Platform: {conf_limits['media_platform']}") + print(f" :P Session Timeout: {conf_limits['session_timeout_min']:.1f}min") + print(f" :P Reconnect Wait: {conf_limits['reconnect_wait_ms']}ms") + + print("\n[3/5] Checking audio limits...") + audio_limits = await check_audio_limits() + if audio_limits['codecs']: + codec = audio_limits['codecs'][0] + print(f" :P Codec: {codec['name']} @ {codec['rate']//1000}kHz") + print(f" :P Channels: {codec['channels']} (stereo)") + if audio_limits['opus_config']: + print(f" :P FEC: {'YES' if audio_limits['opus_config'].get('useinbandfec') == '1' else 'NO'}") + print(f" :P DTX: {'YES' if audio_limits['opus_config'].get('usedtx') == '1' else 'NO'}") + print(f" :P RED: {'YES' if audio_limits['red_supported'] else 'NO'}") + + print("\n[4/5] Checking video limits...") + video_limits = await check_video_limits() + codecs_clean = [c for c in video_limits['codecs'] if c in ['VP8', 'VP9', 'H264', 'AV1']] + print(f" :P Codecs: {', '.join(codecs_clean)}") + print(f" :P Simulcast: {'YES' if video_limits['simulcast_supported'] else 'NO'}") + print(f" :P Layers: L1-L4 (120kbps - 1000kbps)") + + print("\n[5/5] Checking ICE/network limits...") + ice_limits = await check_ice_limits() + print(f" :P STUN Servers: {len(ice_limits['stun_servers'])}") + print(f" :P TURN Servers: {len(ice_limits['turn_servers'])}") + print(f" :P Ping Interval: {ice_limits['ping_interval_ms']}ms") + print(f" :P ACK Timeout: {ice_limits['ack_timeout_ms']}ms") + + test_results = [] + if dc_limits['supported'] and dc_limits['max_message_size']: + test_results = await test_message_size_limits(dc_limits['max_message_size']) + test_results.extend(await test_latency_microbench()) + test_results.extend(await test_throughput_limits()) + test_results.extend(await test_chunked_transfer()) + + if conf_limits['max_participants']: + test_results.extend(await test_participant_limit(conf_limits['max_participants'])) + + print(r""" + VERIFICATION RESULTS +""") + + doc_claims = { + "DataChannel max size": (1073741823, dc_limits.get('max_message_size')), + "SCTP port": (5000, dc_limits.get('sctp_port')), + "Max participants": (40, conf_limits.get('max_participants')), + "Session timeout": (120000, conf_limits.get('session_timeout_ms')), + "Ping interval": (5000, ice_limits.get('ping_interval_ms')), + "ACK timeout": (9000, ice_limits.get('ack_timeout_ms')) + } + + all_match = True + for claim, (expected, actual) in doc_claims.items(): + match = expected == actual + status = ":P" if match else "X" + print(f" - {claim}: {status} {actual} {'(OK)' if match else f'(expected {expected})'}") + if not match: + all_match = False + + if test_results: + print("\nReal Transfer Tests:") + for test_name, success, error in test_results: + status = ":P" if success else "X" + print(f" - {test_name}: {status} {'OK' if success else error[:30]}") + + if all_match: + print("\n :P ALL LIMITS VERIFIED - Documentation is accurate!") + else: + print("\n X MISMATCH DETECTED - Some limits differ from docs") + + return { + "datachannel": dc_limits, + "conference": conf_limits, + "audio": audio_limits, + "video": video_limits, + "ice": ice_limits + } + +if __name__ == "__main__": + try: + asyncio.run(check_all_limits()) + except KeyboardInterrupt: + print("\n\nCheck interrupted.")