refactor: simplify and consolidate PoC scripts for SaluteJazz, Telemost, and WebStream DataChannel testing

This commit is contained in:
zarazaex69
2026-04-19 22:55:15 +03:00
parent 74c67b1057
commit 73db058480
3 changed files with 375 additions and 1738 deletions

1444
code/jazz_poc.py Normal file → Executable file

File diff suppressed because it is too large Load Diff

View File

@@ -1,366 +1,168 @@
#!/usr/bin/env python3
"""PoC: Yandex Telemost DataChannel via Websocket and AIORTC."""
import asyncio
import json
import uuid
import time
import websockets
import requests
import websockets
from urllib.parse import quote
from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceCandidate, RTCConfiguration, RTCIceServer
CONFERENCE_ID = "75047680642749"
CONFERENCE_URL = f"https://telemost.yandex.ru/j/{CONFERENCE_ID}"
API_BASE = "https://cloud-api.yandex.ru/telemost_front/v2/telemost"
ICE_SERVER = RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"])
TEST_MESSAGES = ["Hello Yandex Telemost!", "Hello world", "X" * 100, "Final test"]
def generate_uuid():
return str(uuid.uuid4())
def _gen_uuid() -> str: return str(uuid.uuid4())
def get_connection_info(display_name):
url = f"{API_BASE}/conferences/{quote(CONFERENCE_URL, safe='')}/connection"
params = {
"next_gen_media_platform_allowed": "true",
"display_name": display_name,
"waiting_room_supported": "true"
}
def _get_conn_info(display_name: str) -> dict:
url = f"{API_BASE}/conferences/{quote(f'https://telemost.yandex.ru/j/{CONFERENCE_ID}', safe='')}/connection"
headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0",
"Accept": "*/*",
"content-type": "application/json",
"Client-Instance-Id": generate_uuid(),
"User-Agent": "Mozilla/5.0 (Linux x86_64)",
"Client-Instance-Id": _gen_uuid(),
"X-Telemost-Client-Version": "187.1.0",
"idempotency-key": generate_uuid(),
"Origin": "https://telemost.yandex.ru",
"Referer": "https://telemost.yandex.ru/"
"idempotency-key": _gen_uuid(),
}
response = requests.get(url, params=params, headers=headers)
response.raise_for_status()
return response.json()
params = {"next_gen_media_platform_allowed": "true", "display_name": display_name, "waiting_room_supported": "true"}
resp = requests.get(url, params=params, headers=headers)
resp.raise_for_status()
return resp.json()
async def create_peer(name, is_server=False):
conn_info = get_connection_info(name)
room_id = conn_info["room_id"]
peer_id = conn_info["peer_id"]
credentials = conn_info["credentials"]
ws_url = conn_info["client_configuration"]["media_server_url"]
pc_sub = RTCPeerConnection(RTCConfiguration(
iceServers=[RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"])]
))
pc_pub = RTCPeerConnection(RTCConfiguration(
iceServers=[RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"])]
))
async def _create_peer(name: str, is_server: bool = False, stats: dict = None) -> dict:
info = _get_conn_info(name)
ws = await websockets.connect(info["client_configuration"]["media_server_url"])
pc_sub = RTCPeerConnection(RTCConfiguration(iceServers=[ICE_SERVER]))
pc_pub = RTCPeerConnection(RTCConfiguration(iceServers=[ICE_SERVER]))
dc_pub = pc_pub.createDataChannel("olcrtc", ordered=True)
dc_pub_open = asyncio.Event()
stats = {
"sent": 0,
"received": 0,
"bytes_sent": 0,
"bytes_received": 0,
"messages": []
}
dc_ready = asyncio.Event()
@dc_pub.on("open")
def on_pub_open():
dc_pub_open.set()
def on_open(): dc_ready.set()
@dc_pub.on("message")
def on_pub_msg(msg):
stats["received"] += 1
stats["bytes_received"] += len(msg)
stats["messages"].append(("received", msg, time.time()))
def on_pub_msg(msg): stats["recv"] += 1
@pc_sub.on("datachannel")
def on_sub_dc(channel):
@channel.on("message")
def on_message(message):
stats["received"] += 1
stats["bytes_received"] += len(message)
stats["messages"].append(("received", message, time.time()))
def on_msg(m):
stats["recv"] += 1
if is_server and channel.label == "olcrtc":
response = f"Echo: {message}"
try:
dc_pub.send(response)
dc_pub.send(f"Echo: {m}")
stats["sent"] += 1
stats["bytes_sent"] += len(response)
stats["messages"].append(("sent", response, time.time()))
except:
pass
ws = await websockets.connect(ws_url)
hello_msg = {
"uid": generate_uuid(),
except: pass
await ws.send(json.dumps({
"uid": _gen_uuid(),
"hello": {
"participantMeta": {"name": name, "role": "SPEAKER", "sendAudio": False, "sendVideo": False},
"participantAttributes": {"name": name, "role": "SPEAKER"},
"sendAudio": False,
"sendVideo": False,
"sendSharing": False,
"participantId": peer_id,
"roomId": room_id,
"serviceName": "telemost",
"credentials": credentials,
"capabilitiesOffer": {
"offerAnswerMode": ["SEPARATE"],
"initialSubscriberOffer": ["ON_HELLO"],
"slotsMode": ["FROM_CONTROLLER"],
"simulcastMode": ["DISABLED"],
"selfVadStatus": ["FROM_SERVER"],
"dataChannelSharing": ["TO_RTP"]
},
"sendAudio": False, "sendVideo": False, "sendSharing": False,
"participantId": info["peer_id"], "roomId": info["room_id"],
"serviceName": "telemost", "credentials": info["credentials"],
"capabilitiesOffer": {"offerAnswerMode": ["SEPARATE"], "initialSubscriberOffer": ["ON_HELLO"], "slotsMode": ["FROM_CONTROLLER"], "simulcastMode": ["DISABLED"], "selfVadStatus": ["FROM_SERVER"], "dataChannelSharing": ["TO_RTP"]},
"sdkInfo": {"implementation": "python", "version": "1.0.0", "userAgent": f"OlcRTC-{name}"},
"sdkInitializationId": generate_uuid(),
"disablePublisher": False,
"disableSubscriber": False
"sdkInitializationId": _gen_uuid(), "disablePublisher": False, "disableSubscriber": False
}
}
await ws.send(json.dumps(hello_msg))
publisher_sdp_sent = False
async def ws_handler():
nonlocal publisher_sdp_sent
}))
async def _ws_loop():
pub_sdp_sent = False
while True:
try:
data = json.loads(await ws.recv())
uid = data.get("uid")
if "serverHello" in data:
await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}}))
if "subscriberSdpOffer" in data and not publisher_sdp_sent:
await pc_sub.setRemoteDescription(RTCSessionDescription(
sdp=data["subscriberSdpOffer"]["sdp"], type="offer"
))
await ws.send(json.dumps({"uid": uid, "ack": {"status": {"code": "OK"}}}))
answer = await pc_sub.createAnswer()
await pc_sub.setLocalDescription(answer)
await ws.send(json.dumps({
"uid": generate_uuid(),
"subscriberSdpAnswer": {
"pcSeq": data["subscriberSdpOffer"]["pcSeq"],
"sdp": pc_sub.localDescription.sdp
}
}))
await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}}))
elif "subscriberSdpOffer" in data and not pub_sdp_sent:
sdp = data["subscriberSdpOffer"]
await pc_sub.setRemoteDescription(RTCSessionDescription(sdp=sdp["sdp"], type="offer"))
ans = await pc_sub.createAnswer()
await pc_sub.setLocalDescription(ans)
await ws.send(json.dumps({"uid": _gen_uuid(), "subscriberSdpAnswer": {"pcSeq": sdp["pcSeq"], "sdp": pc_sub.localDescription.sdp}}))
await ws.send(json.dumps({"uid": uid, "ack": {"status": {"code": "OK"}}}))
await asyncio.sleep(0.3)
pub_offer = await pc_pub.createOffer()
await pc_pub.setLocalDescription(pub_offer)
await ws.send(json.dumps({"uid": _gen_uuid(), "publisherSdpOffer": {"pcSeq": 1, "sdp": pc_pub.localDescription.sdp}}))
pub_sdp_sent = True
await ws.send(json.dumps({
"uid": generate_uuid(),
"publisherSdpOffer": {
"pcSeq": 1,
"sdp": pc_pub.localDescription.sdp
}
}))
publisher_sdp_sent = True
if "publisherSdpAnswer" in data:
await pc_pub.setRemoteDescription(RTCSessionDescription(
sdp=data["publisherSdpAnswer"]["sdp"], type="answer"
))
await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}}))
if "webrtcIceCandidate" in data:
elif "publisherSdpAnswer" in data:
await pc_pub.setRemoteDescription(RTCSessionDescription(sdp=data["publisherSdpAnswer"]["sdp"], type="answer"))
await ws.send(json.dumps({"uid": uid, "ack": {"status": {"code": "OK"}}}))
elif "webrtcIceCandidate" in data:
cand = data["webrtcIceCandidate"]
try:
parts = cand["candidate"].split()
if len(parts) >= 8:
ice = RTCIceCandidate(
component=int(parts[1]),
foundation=parts[0].replace("candidate:", ""),
ip=parts[4],
port=int(parts[5]),
priority=int(parts[3]),
protocol=parts[2],
type=parts[7],
sdpMid=cand["sdpMid"],
sdpMLineIndex=cand["sdpMlineIndex"]
)
if cand.get("target") == "SUBSCRIBER":
await pc_sub.addIceCandidate(ice)
elif cand.get("target") == "PUBLISHER":
await pc_pub.addIceCandidate(ice)
except:
pass
except:
break
@pc_sub.on("icecandidate")
async def on_sub_ice(event):
if event.candidate:
await ws.send(json.dumps({
"uid": generate_uuid(),
"webrtcIceCandidate": {
"candidate": event.candidate.candidate,
"sdpMid": event.candidate.sdpMid,
"sdpMlineIndex": event.candidate.sdpMLineIndex,
"target": "SUBSCRIBER",
"pcSeq": 1
}
}))
@pc_pub.on("icecandidate")
async def on_pub_ice(event):
if event.candidate:
await ws.send(json.dumps({
"uid": generate_uuid(),
"webrtcIceCandidate": {
"candidate": event.candidate.candidate,
"sdpMid": event.candidate.sdpMid,
"sdpMlineIndex": event.candidate.sdpMLineIndex,
"target": "PUBLISHER",
"pcSeq": 1
}
}))
ws_task = asyncio.create_task(ws_handler())
return {
"name": name,
"dc_pub": dc_pub,
"dc_pub_open": dc_pub_open,
"stats": stats,
"ws": ws,
"ws_task": ws_task,
"pc_sub": pc_sub,
"pc_pub": pc_pub
}
parts = cand["candidate"].split()
if len(parts) >= 8:
ice = RTCIceCandidate(component=int(parts[1]), foundation=parts[0].replace("candidate:", ""), ip=parts[4], port=int(parts[5]), priority=int(parts[3]), protocol=parts[2], type=parts[7], sdpMid=cand["sdpMid"], sdpMLineIndex=cand["sdpMlineIndex"])
await (pc_sub if cand.get("target") == "SUBSCRIBER" else pc_pub).addIceCandidate(ice)
except Exception: break
async def run_full_test():
print(r"""
OlcRTC - Full Test Suite
DataChannel over Yandex Telemost SFU
by zowue for olc
""")
async def _on_ice(event, target):
if event.candidate:
await ws.send(json.dumps({"uid": _gen_uuid(), "webrtcIceCandidate": {"candidate": event.candidate.candidate, "sdpMid": event.candidate.sdpMid, "sdpMlineIndex": event.candidate.sdpMLineIndex, "target": target, "pcSeq": 1}}))
pc_sub.on("icecandidate", lambda e: asyncio.create_task(_on_ice(e, "SUBSCRIBER")))
pc_pub.on("icecandidate", lambda e: asyncio.create_task(_on_ice(e, "PUBLISHER")))
results = {
"server_connected": False,
"client_connected": False,
"messages_sent": 0,
"messages_received": 0,
"bytes_sent": 0,
"bytes_received": 0,
"latency_ms": [],
"errors": []
}
return {"dc": dc_pub, "ready": dc_ready, "task": asyncio.create_task(_ws_loop()), "ws": ws, "pc_sub": pc_sub, "pc_pub": pc_pub}
async def run_poc() -> dict:
print("\n--- Yandex Telemost PoC ---")
results = {"server_ok": False, "client_ok": False, "sent": 0, "recv": 0, "errors": []}
s_stats, c_stats = {"sent": 0, "recv": 0}, {"sent": 0, "recv": 0}
print("[1/4] Creating server peer...")
print("[1/3] Connecting Server & Client...")
try:
server = await create_peer("Server", is_server=True)
await asyncio.wait_for(server["dc_pub_open"].wait(), timeout=10.0)
results["server_connected"] = True
print(" :P Server connected")
server = await _create_peer("Server", is_server=True, stats=s_stats)
await asyncio.wait_for(server["ready"].wait(), 10.0)
results["server_ok"] = True
client = await _create_peer("Client", is_server=False, stats=c_stats)
await asyncio.wait_for(client["ready"].wait(), 10.0)
results["client_ok"] = True
print(" :P Peers connected")
except Exception as e:
results["errors"].append(f"Server failed: {e}")
print(f" X Error: {e}")
results["errors"].append(str(e))
return results
print("\n[2/4] Creating client peer...")
try:
client = await create_peer("Client", is_server=False)
await asyncio.wait_for(client["dc_pub_open"].wait(), timeout=10.0)
results["client_connected"] = True
print(" :P Client connected")
except Exception as e:
results["errors"].append(f"Client failed: {e}")
print(f" X Error: {e}")
return results
print("\n[3/4] Testing message exchange...")
await asyncio.sleep(2)
test_messages = [
"Hello OlcRTC!",
"я всего лиш хотел дружить зачем тролякатся",
"X" * 100,
"Final test"
]
try:
for i, msg in enumerate(test_messages, 1):
send_time = time.time()
client["dc_pub"].send(msg)
client["stats"]["sent"] += 1
client["stats"]["bytes_sent"] += len(msg)
print(f" -> Sent message {i}/{len(test_messages)} ({len(msg)}b)")
print("\n[2/3] Exchanging messages...")
await asyncio.sleep(1)
for idx, msg in enumerate(TEST_MESSAGES, 1):
try:
client["dc"].send(msg)
c_stats["sent"] += 1
print(f" -> Sent: {msg}")
await asyncio.sleep(0.5)
await asyncio.sleep(3)
results["messages_sent"] = client["stats"]["sent"]
results["messages_received"] = client["stats"]["received"]
results["bytes_sent"] = client["stats"]["bytes_sent"]
results["bytes_received"] = client["stats"]["bytes_received"]
print(f" :P Sent: {results['messages_sent']} messages")
print(f" :P Received: {results['messages_received']} responses")
except Exception as e:
results["errors"].append(f"Exchange failed: {e}")
print(f" X Error: {e}")
print("\n[4/4] Cleaning up...")
try:
server["ws_task"].cancel()
client["ws_task"].cancel()
await server["ws"].close()
await client["ws"].close()
await server["pc_sub"].close()
await server["pc_pub"].close()
await client["pc_sub"].close()
await client["pc_pub"].close()
print(" :P Cleanup complete")
except:
pass
except Exception as e:
results["errors"].append(f"Sending {idx} failed: {str(e)}")
await asyncio.sleep(2)
results["sent"], results["recv"] = c_stats["sent"], c_stats["recv"]
print("\n[3/3] Cleaning up...")
for p in (server, client):
p["task"].cancel()
await p["ws"].close()
await p["pc_sub"].close()
await p["pc_pub"].close()
return results
def print_results(results):
print(r"""
TEST RESULTS
""")
print("Connection Status:")
print(f" - Server: {':P Connected' if results['server_connected'] else 'X Failed'}")
print(f" - Client: {':P Connected' if results['client_connected'] else 'X Failed'}")
print("\nMessage Exchange:")
print(f" - Sent: {results['messages_sent']} messages ({results['bytes_sent']} bytes)")
print(f" - Received: {results['messages_received']} messages ({results['bytes_received']} bytes)")
success_rate = (results['messages_received'] / results['messages_sent'] * 100) if results['messages_sent'] > 0 else 0
print(f" - Success Rate: {success_rate:.1f}%")
if results['errors']:
print("\nErrors:")
for err in results['errors']:
print(f" - {err}")
if results['messages_received'] > 0:
print("\n :P TEST PASSED - OlcRTC PoC works!")
else:
print("\n X TEST FAILED - Check errors above")
async def main():
results = await run_full_test()
print_results(results)
def print_results(res: dict):
print("\n--- TEST RESULTS ---")
print(f"Server: {':P' if res['server_ok'] else 'X'} / Client: {':P' if res['client_ok'] else 'X'}")
print(f"Messages: Sent {res['sent']} / Recv {res['recv']}")
if res['errors']:
for e in res['errors']: print(f" Error: {e}")
print(f"\n{':P SUCCESS' if res['sent'] and res['sent'] == res['recv'] else 'X FAILED'}\n")
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
print("\n\nTest interrupted.")
try: res = asyncio.run(run_poc()); print_results(res)
except KeyboardInterrupt: pass

247
code/wbstream_poc.py Normal file → Executable file
View File

@@ -1,218 +1,113 @@
#!/usr/bin/env python3
"""PoC: передача произвольных данных через WB Stream SFU (LiveKit).
В отличие от Яндекс Телемоста или SaluteJazz, WB Stream использует
стандартный немодифицированный протокол LiveKit (v16). Это означает,
что нам не нужно собирать кастомные WebSocket-обработчики или костыли
с Protobuf/JSON. Мы можем использовать официальную библиотеку livekit.
"""
"""PoC: WB Stream DataChannel over LiveKit."""
import asyncio
import logging
import time
import uuid
import requests
try:
from livekit import rtc
except ImportError:
print("\n[!] Ошибка: не установлена библиотека livekit.")
print("Выполните: pip install livekit requests\n")
print("[!] Error: livekit library not installed.\nRun: pip install livekit requests")
exit(1)
logging.getLogger("livekit").setLevel(logging.WARNING)
API_BASE = "https://stream.wb.ru"
WS_URL = "wss://wbstream01-el.wb.ru:7880"
TEST_MESSAGES = ["Hello WB Stream!", "Hello world", "X" * 100, "Final test"]
def generate_uuid():
return str(uuid.uuid4())
def get_room_token(room_id: str, display_name: str) -> str:
def _get_room_token(room_id: str, display_name: str) -> tuple[str, str]:
"""Retrieves the room token via the guest API."""
headers = {
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0)",
"Accept": "application/json, text/plain, */*",
"User-Agent": "Mozilla/5.0 (Linux x86_64)",
"Content-Type": "application/json"
}
# 0. Гостевая регистрация для получения Bearer токена
reg_resp = requests.post(
# 1. Guest Registration
reg_req = requests.post(
f"{API_BASE}/auth/api/v1/auth/user/guest-register",
json={"displayName": display_name, "device": {"deviceName": "Linux Device", "deviceType": "PARTICIPANT_DEVICE_TYPE_WEB_DESKTOP"}},
json={"displayName": display_name, "device": {"deviceName": "Linux", "deviceType": "PARTICIPANT_DEVICE_TYPE_WEB_DESKTOP"}},
headers=headers
)
reg_resp.raise_for_status()
access_token = reg_resp.json()["accessToken"]
headers["Authorization"] = f"Bearer {access_token}"
reg_req.raise_for_status()
headers["Authorization"] = f"Bearer {reg_req.json()['accessToken']}"
# 2. Room Creation (if empty)
if not room_id:
# 1. Создаем комнату (только для первого пира)
resp = requests.post(
f"{API_BASE}/api-room/api/v2/room",
json={"roomType": "ROOM_TYPE_ALL_ON_SCREEN", "roomPrivacy": "ROOM_PRIVACY_FREE"},
headers=headers
)
resp.raise_for_status()
room_id = resp.json()["roomId"]
# 2. Джойнимся
resp = requests.post(f"{API_BASE}/api-room/api/v1/room/{room_id}/join", json={}, headers=headers)
resp.raise_for_status()
# 3. Получаем токен
params = {
"deviceType": "PARTICIPANT_DEVICE_TYPE_WEB_DESKTOP",
"displayName": display_name
}
resp = requests.get(
f"{API_BASE}/api-room-manager/api/v1/room/{room_id}/token",
params=params,
headers=headers
)
resp.raise_for_status()
return room_id, resp.json()["roomToken"]
room_req = requests.post(f"{API_BASE}/api-room/api/v2/room", json={"roomType": "ROOM_TYPE_ALL_ON_SCREEN", "roomPrivacy": "ROOM_PRIVACY_FREE"}, headers=headers)
room_req.raise_for_status()
room_id = room_req.json()["roomId"]
# 3. Join & Token
requests.post(f"{API_BASE}/api-room/api/v1/room/{room_id}/join", json={}, headers=headers).raise_for_status()
tok_req = requests.get(f"{API_BASE}/api-room-manager/api/v1/room/{room_id}/token", params={"deviceType": "PARTICIPANT_DEVICE_TYPE_WEB_DESKTOP", "displayName": display_name}, headers=headers)
tok_req.raise_for_status()
return room_id, tok_req.json()["roomToken"]
async def run_full_test():
print(r"""
OlcRTC - Full Test Suite
DataChannel over WB Stream SFU (LiveKit)
by zowue for olc
""")
async def run_poc() -> dict:
"""Runs the complete PoC flow."""
print("\n--- WB Stream PoC ---")
results = {"server_ok": False, "client_ok": False, "sent": 0, "recv": 0, "errors": []}
results = {
"server_connected": False,
"client_connected": False,
"messages_sent": 0,
"messages_received": 0,
"bytes_sent": 0,
"bytes_received": 0,
"errors": []
}
print("[1/4] Creating server peer...")
server, client = rtc.Room(), rtc.Room()
shared_room_id, _ = _get_room_token("", "OlcRTC-Server")
print("[1/3] Connecting Server & Client...")
try:
shared_room_id, server_token = get_room_token("", "OlcRTC-Server")
server_room = rtc.Room()
server_stats = {"sent": 0, "received": 0}
@server_room.on("data_received")
def on_s_data(dp: rtc.DataPacket):
text = dp.data.decode('utf-8', errors='replace')
server_stats["received"] += 1
if dp.topic == "olcrtc_poc":
resp_text = f"Echo: {text}"
resp_data = resp_text.encode('utf-8')
asyncio.create_task(server_room.local_participant.publish_data(resp_data, topic="olcrtc_poc"))
server_stats["sent"] += 1
shared_room_id, server_tok = _get_room_token("", "OlcRTC-Server")
_, client_tok = _get_room_token(shared_room_id, "OlcRTC-Client")
await server_room.connect(WS_URL, server_token)
results["server_connected"] = True
print(f" :P Server connected (Room: {shared_room_id})")
@server.on("data_received")
def on_server_data(dp: rtc.DataPacket):
if dp.topic == "olcrtc":
asyncio.create_task(server.local_participant.publish_data(f"Echo: {dp.data.decode()}".encode(), topic="olcrtc"))
@client.on("data_received")
def on_client_data(dp: rtc.DataPacket):
results["recv"] += 1
await server.connect(WS_URL, server_tok)
results["server_ok"] = True
await client.connect(WS_URL, client_tok)
results["client_ok"] = True
print(f" :P Peers connected to room: {shared_room_id}")
except Exception as e:
results["errors"].append(f"Server failed: {e}")
print(f" X Error: {e}")
results["errors"].append(str(e))
return results
print("\n[2/4] Creating client peer...")
try:
_, client_token = get_room_token(shared_room_id, "OlcRTC-Client")
client_room = rtc.Room()
client_stats = {"sent": 0, "received": 0, "bytes_sent": 0, "bytes_received": 0}
@client_room.on("data_received")
def on_c_data(dp: rtc.DataPacket):
text = dp.data.decode('utf-8', errors='replace')
client_stats["received"] += 1
client_stats["bytes_received"] += len(dp.data)
await client_room.connect(WS_URL, client_token)
results["client_connected"] = True
print(" :P Client connected")
except Exception as e:
results["errors"].append(f"Client failed: {e}")
print(f" X Error: {e}")
return results
print("\n[2/3] Exchanging messages...")
await asyncio.sleep(1)
print("\n[3/4] Testing message exchange...")
await asyncio.sleep(2) # Даем время LiveKit поднять WebRTC транспорт
test_messages = [
"Hello WB Stream!",
"я всего лиш хотел дружить зачем тролякатся",
"X" * 100,
"Final test"
]
try:
for i, msg in enumerate(test_messages, 1):
msg_bytes = msg.encode('utf-8')
# Отправляем DataPacket через LiveKit
await client_room.local_participant.publish_data(msg_bytes, topic="olcrtc_poc")
client_stats["sent"] += 1
client_stats["bytes_sent"] += len(msg_bytes)
print(f" -> Sent message {i}/{len(test_messages)} ({len(msg_bytes)}b)")
for idx, msg in enumerate(TEST_MESSAGES, 1):
try:
await client.local_participant.publish_data(msg.encode(), topic="olcrtc")
results["sent"] += 1
print(f" -> Sent: {msg}")
await asyncio.sleep(0.5)
await asyncio.sleep(3)
results["messages_sent"] = client_stats["sent"]
results["messages_received"] = client_stats["received"]
results["bytes_sent"] = client_stats["bytes_sent"]
results["bytes_received"] = client_stats["bytes_received"]
print(f" :P Sent: {results['messages_sent']} messages")
print(f" :P Received: {results['messages_received']} responses")
except Exception as e:
results["errors"].append(f"Exchange failed: {e}")
print(f" X Error: {e}")
except Exception as e:
results["errors"].append(f"Sending {idx} failed: {str(e)}")
await asyncio.sleep(2)
print("\n[4/4] Cleaning up...")
try:
await server_room.disconnect()
await client_room.disconnect()
print(" :P Cleanup complete")
except:
pass
print("\n[3/3] Cleaning up...")
await server.disconnect()
await client.disconnect()
return results
def print_results(results):
print(r"""
TEST RESULTS
""")
print("Connection Status:")
print(f" - Server: {':P Connected' if results.get('server_connected') else 'X Failed'}")
print(f" - Client: {':P Connected' if results.get('client_connected') else 'X Failed'}")
print("\nMessage Exchange:")
print(f" - Sent: {results.get('messages_sent', 0)} messages ({results.get('bytes_sent', 0)} bytes)")
print(f" - Received: {results.get('messages_received', 0)} messages ({results.get('bytes_received', 0)} bytes)")
sent = results.get('messages_sent', 0)
success_rate = (results.get('messages_received', 0) / sent * 100) if sent > 0 else 0
print(f" - Success Rate: {success_rate:.1f}%")
if results.get('errors'):
print("\nErrors:")
for err in results['errors']:
print(f" - {err}")
if results.get('messages_received', 0) > 0:
print("\n :P TEST PASSED - WB Stream PoC works!")
else:
print("\n X TEST FAILED - Check errors above")
async def main():
results = await run_full_test()
print_results(results)
def print_results(res: dict):
print("\n--- TEST RESULTS ---")
print(f"Server: {':P' if res['server_ok'] else 'X'} / Client: {':P' if res['client_ok'] else 'X'}")
print(f"Messages: Sent {res['sent']} / Recv {res['recv']}")
if res['errors']:
for e in res['errors']: print(f" Error: {e}")
print(f"\n{':P SUCCESS' if res['sent'] and res['sent'] == res['recv'] else 'X FAILED'}\n")
if __name__ == "__main__":
try:
asyncio.run(main())
res = asyncio.run(run_poc())
print_results(res)
except KeyboardInterrupt:
print("\n\nTest interrupted.")
pass