From 73db05848062fc866ed314141ff3c8025e1f5bc5 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Sun, 19 Apr 2026 22:55:15 +0300 Subject: [PATCH] refactor: simplify and consolidate PoC scripts for SaluteJazz, Telemost, and WebStream DataChannel testing --- code/jazz_poc.py | 1444 ++++++------------------------------------ code/telemost_poc.py | 422 ++++-------- code/wbstream_poc.py | 247 +++----- 3 files changed, 375 insertions(+), 1738 deletions(-) mode change 100644 => 100755 code/jazz_poc.py mode change 100644 => 100755 code/wbstream_poc.py diff --git a/code/jazz_poc.py b/code/jazz_poc.py old mode 100644 new mode 100755 index 621c74f..c71e067 --- a/code/jazz_poc.py +++ b/code/jazz_poc.py @@ -1,10 +1,5 @@ #!/usr/bin/env python3 -"""PoC: передача произвольных данных через SaluteJazz SFU (LiveKit 1.5.3). - -Два aiortc-клиента подключаются к анонимной комнате Jazz, -обмениваются сообщениями через DataChannel (LiveKit DataPacket protobuf), -весь трафик проходит через TURN-relay Сбера (a-t/s-t.salutejazz.ru). -""" +"""PoC: SaluteJazz DataChannel over LiveKit.""" import asyncio import io @@ -12,1291 +7,236 @@ import json import logging import time import uuid - import aiohttp -from aiortc import (RTCConfiguration, RTCIceCandidate, RTCIceServer, - RTCPeerConnection, RTCSessionDescription) +from aiortc import RTCConfiguration, RTCIceCandidate, RTCIceServer, RTCPeerConnection, RTCSessionDescription from aiortc.mediastreams import AudioStreamTrack from aiortc.rtcconfiguration import RTCBundlePolicy -logging.basicConfig( - level=logging.INFO, format="[%(levelname)s] %(message)s" -) -log = logging.getLogger(__name__) - logging.getLogger("aiortc").setLevel(logging.WARNING) -logging.getLogger("aioice").setLevel(logging.WARNING) -logging.getLogger("aioice.turn").setLevel(logging.INFO) API_BASE = "https://bk.salutejazz.ru" -JAZZ_HEADERS = { - "X-Jazz-ClientId": str(uuid.uuid4()), - "X-Jazz-AuthType": "ANONYMOUS", - "X-Client-AuthType": "ANONYMOUS", - "Content-Type": "application/json", -} +JAZZ_HEADERS = {"X-Jazz-ClientId": str(uuid.uuid4()), "X-Jazz-AuthType": "ANONYMOUS", "X-Client-AuthType": "ANONYMOUS", "Content-Type": "application/json"} +TEST_MESSAGES = ["Hello Jazz DC!", "Hello world", "X" * 100, "Final test"] -# Маппинг подсети relay ↔ TURN-хост. -_SUBNET_TO_TURN: dict[str, str] = { - "172.17": "s-t.", - "172.20": "a-t.", -} -_TURN_TO_SUBNET: dict[str, str] = { - v: k for k, v in _SUBNET_TO_TURN.items() -} +def _pb_varint(v: int) -> bytes: + b = bytearray() + while v > 0x7F: b.append((v & 0x7F) | 0x80); v >>= 7 + b.append(v & 0x7F) + return bytes(b) +def _pb_field(f: int, w: int, d: bytes) -> bytes: + t = _pb_varint((f << 3) | w) + return t + d if w == 0 else (t + _pb_varint(len(d)) + d if w == 2 else t + d) -def _pb_varint(value: int) -> bytes: - """Кодирование varint (protobuf wire format).""" - buf = bytearray() - while value > 0x7F: - buf.append((value & 0x7F) | 0x80) - value >>= 7 - buf.append(value & 0x7F) - return bytes(buf) - - -def _pb_field(field_number: int, wire_type: int, data: bytes) -> bytes: - """Кодирование одного protobuf-поля.""" - tag = _pb_varint((field_number << 3) | wire_type) - if wire_type == 0: - return tag + data - if wire_type == 2: - return tag + _pb_varint(len(data)) + data - return tag + data - - -def _read_varint(stream: io.BytesIO) -> int | None: - """Чтение varint из потока.""" - result = 0 - shift = 0 - while True: - b = stream.read(1) - if not b: - return None - byte = b[0] - result |= (byte & 0x7F) << shift - if not (byte & 0x80): - return result +def _read_varint(s: io.BytesIO) -> int | None: + res, shift = 0, 0 + while b := s.read(1): + res |= (b[0] & 0x7F) << shift + if not (b[0] & 0x80): return res shift += 7 - + return None def encode_data_packet(payload: bytes, topic: str = "") -> bytes: - """Сериализация LiveKit DataPacket с UserPacket. + uf = _pb_field(2, 2, payload) + (_pb_field(4, 2, topic.encode()) if topic else b"") + _pb_field(8, 2, str(uuid.uuid4()).encode()) + return _pb_field(1, 0, _pb_varint(0)) + _pb_field(2, 2, uf) - Args: - payload: Пользовательские данные. - topic: Топик сообщения (опционально). - - Returns: - Сериализованный protobuf DataPacket. - """ - user_fields = _pb_field(2, 2, payload) - if topic: - user_fields += _pb_field(4, 2, topic.encode()) - msg_id = str(uuid.uuid4()) - user_fields += _pb_field(8, 2, msg_id.encode()) - - user_packet = user_fields - - dp = _pb_field(1, 0, _pb_varint(0)) - dp += _pb_field(2, 2, user_packet) - return dp - - -def decode_data_packet( - raw: bytes, -) -> tuple[bytes, str] | None: - """Десериализация LiveKit DataPacket → (payload, topic). - - Returns: - Кортеж (payload, topic) или None если не UserPacket. - """ - stream = io.BytesIO(raw) - user_data: bytes | None = None - - while True: - tag_val = _read_varint(stream) - if tag_val is None: - break - field_number = tag_val >> 3 - wire_type = tag_val & 0x07 - - if wire_type == 0: - _read_varint(stream) - elif wire_type == 2: - length = _read_varint(stream) - if length is None: - break - data = stream.read(length) - if field_number == 2: - user_data = data - elif wire_type == 1: - stream.read(8) - elif wire_type == 5: - stream.read(4) - else: - break - - if user_data is None: - return None - - payload = b"" - topic = "" - inner = io.BytesIO(user_data) - while True: - tag_val = _read_varint(inner) - if tag_val is None: - break - fn = tag_val >> 3 - wt = tag_val & 0x07 - - if wt == 0: - _read_varint(inner) +def decode_data_packet(raw: bytes) -> tuple[bytes, str] | None: + s = io.BytesIO(raw) + ud = None + while (tg := _read_varint(s)) is not None: + wt = tg & 0x07 + if wt == 0: _read_varint(s) elif wt == 2: - length = _read_varint(inner) - if length is None: - break - data = inner.read(length) - if fn == 2: - payload = data - elif fn == 4: - topic = data.decode(errors="replace") - elif wt == 1: - inner.read(8) - elif wt == 5: - inner.read(4) - else: - break + l = _read_varint(s) + if l is None: break + d = s.read(l) + if (tg >> 3) == 2: ud = d + elif wt == 1: s.read(8) + elif wt == 5: s.read(4) + else: break + if ud is None: return None + p, t, ins = b"", "", io.BytesIO(ud) + while (tg := _read_varint(ins)) is not None: + wt = tg & 0x07 + if wt == 0: _read_varint(ins) + elif wt == 2: + l = _read_varint(ins) + if l is None: break + d = ins.read(l) + fn = tg >> 3 + if fn == 2: p = d + elif fn == 4: t = d.decode(errors="replace") + elif wt == 1: ins.read(8) + elif wt == 5: ins.read(4) + else: break + return p, t - return (payload, topic) - - -def _subnet_prefix(ip: str) -> str: - """Первые два октета IP-адреса (например '172.17').""" - parts = ip.split(".") - return f"{parts[0]}.{parts[1]}" if len(parts) == 4 else "" - - -def _relay_subnet_from_turn_url(turn_url: str) -> str: - """Определение подсети relay по hostname TURN-сервера. - - Args: - turn_url: TURN URL (напр. 'turn:s-t.salutejazz.ru:3478?transport=udp'). - - Returns: - Подсеть relay (напр. '172.17') или пустая строка. - """ - for prefix, subnet in _TURN_TO_SUBNET.items(): - if prefix in turn_url: - return subnet - return "" - - -def _pick_turn_for_subnet( - sfu_subnet: str, - all_turns: list[str], -) -> str | None: - """Выбрать TURN URL, совместимый с подсетью SFU.""" - prefix = _SUBNET_TO_TURN.get(sfu_subnet, "") - if prefix: - for u in all_turns: - if prefix in u and "transport=udp" in u: - return u - return all_turns[0] if all_turns else None - - -def gen_uuid() -> str: - """Генерация UUID v4.""" - return str(uuid.uuid4()) - - -async def create_room( - session: aiohttp.ClientSession, -) -> dict[str, str]: - """Создание комнаты и получение WS URL. - - Returns: - dict с ключами roomId, password, connectorUrl. - """ - create_resp = await session.post( - f"{API_BASE}/room/create-meeting", - headers=JAZZ_HEADERS, - json={ - "title": "PoC DataChannel", - "guestEnabled": True, - "lobbyEnabled": False, - "serverVideoRecordAutoStartEnabled": False, - "sipEnabled": False, - "moderatorEmails": [], - "summarizationEnabled": False, - "room3dEnabled": False, - "room3dScene": "XRLobby", - }, - ) - create_resp.raise_for_status() - room_data = await create_resp.json() - room_id: str = room_data["roomId"] - password: str = room_data["password"] - - preconnect_resp = await session.post( - f"{API_BASE}/room/{room_id}/preconnect", - headers=JAZZ_HEADERS, - json={ - "password": password, - "jazzNextMigration": { - "b2bBaseRoomSupport": True, - "demoRoomBaseSupport": True, - "demoRoomVersionSupport": 2, - "mediaWithoutAutoSubscribeSupport": True, - "webinarSpeakerSupport": True, - "webinarViewerSupport": True, - "sdkRoomSupport": True, - "sberclassRoomSupport": True, - }, - }, - ) - preconnect_resp.raise_for_status() - preconnect_data = await preconnect_resp.json() - - return { - "roomId": room_id, - "password": password, - "connectorUrl": preconnect_data["connectorUrl"], - } - - -def parse_ice_candidate( - cand_data: dict[str, object], -) -> RTCIceCandidate | None: - """Парсинг ICE-кандидата из строки candidate:... . - - Returns: - RTCIceCandidate или None, если не удалось распарсить. - """ - cand_str = str(cand_data.get("candidate", "")) - parts = cand_str.split() - if len(parts) < 8: - return None - - sdp_mid = cand_data.get("sdpMid") - sdp_m_line_index = cand_data.get("sdpMLineIndex", 0) - - return RTCIceCandidate( - component=int(parts[1]), - foundation=parts[0].replace("candidate:", ""), - ip=parts[4], - port=int(parts[5]), - priority=int(parts[3]), - protocol=parts[2], - type=parts[7], - sdpMid=str(sdp_mid) if sdp_mid is not None else "0", - sdpMLineIndex=int(sdp_m_line_index), - ) - - -async def create_peer( - name: str, - room_info: dict[str, str], - session: aiohttp.ClientSession, - is_server: bool = False, - known_sfu_subnet: str = "", -) -> dict[str, object]: - """Подключение пира к Jazz SFU. - - Args: - name: Имя участника. - room_info: Словарь с roomId, password, connectorUrl. - session: aiohttp-сессия. - is_server: Если True, эхо-ответ на входящие сообщения. - known_sfu_subnet: Подсеть SFU (напр. '172.17') для выбора TURN. - - Returns: - dict с ключами name, dc_ready, stats, ws, pc_sub, pc_pub, - sfu_subnet, subnet_ok и т.д. - """ - room_id = room_info["roomId"] - password = room_info["password"] - connector_url = room_info["connectorUrl"] - - group_id: str | None = None - ice_servers_config: list[RTCIceServer] = [] - all_turn_urls_saved: list[str] = [] - turn_creds_saved: dict[str, str | None] = { - "username": None, - "credential": None, - } - - pc_sub: RTCPeerConnection | None = None - pc_pub: RTCPeerConnection | None = None - dc_pub: object | None = None - dc_ready = asyncio.Event() - sub_ready = asyncio.Event() - - pending_ice_sub: list[dict[str, object]] = [] - pending_ice_pub: list[dict[str, object]] = [] - - relay_subnet: str = "" - sfu_ip: str | None = None - subnet_ok: bool | None = None - subnet_checked = asyncio.Event() - - stats: dict[str, object] = { - "sent": 0, - "received": 0, - "messages": [], - } - - ws = await session.ws_connect(connector_url) - - # --- join --- - await ws.send_json( - { - "roomId": room_id, - "event": "join", - "requestId": gen_uuid(), - "payload": { - "password": password, - "participantName": name, - "supportedFeatures": { - "attachedRooms": True, - "sessionGroups": True, - "transcription": True, - }, - "isSilent": False, - }, - } - ) - - publisher_offer_sent = False - - async def ws_loop() -> None: - nonlocal group_id, ice_servers_config - nonlocal pc_sub, pc_pub, dc_pub - nonlocal publisher_offer_sent - nonlocal relay_subnet, sfu_ip, subnet_ok +async def _create_peer(name: str, room: dict, session: aiohttp.ClientSession, is_server: bool = False, stats: dict = None) -> dict: + ws = await session.ws_connect(room["connectorUrl"]) + await ws.send_json({"roomId": room["roomId"], "event": "join", "requestId": str(uuid.uuid4()), "payload": {"password": room["password"], "participantName": name, "supportedFeatures": {"attachedRooms": True, "sessionGroups": True}, "isSilent": False}}) + + peer = {"ws": ws, "pc_sub": None, "pc_pub": None, "dc": None, "ready": asyncio.Event(), "sub_ready": asyncio.Event()} + group_id, p_ice_sub, p_ice_pub = None, [], [] + ice_servers = [] + async def ws_loop(): + nonlocal group_id async for msg in ws: if msg.type == aiohttp.WSMsgType.TEXT: data = json.loads(msg.data) - event = data.get("event", "") - payload = data.get("payload", {}) - method = payload.get("method", "") + ev = data.get("event", "") + p = data.get("payload", {}) + m = p.get("method", "") - if event == "join-response": - p = payload.get("participantGroup", {}) - group_id = p.get("groupId") - log.info( - f"[{name}] Подключён, groupId={group_id}" - ) - - elif event == "media-out" and method == "rtc:config": - config = payload.get("configuration", {}) - servers = config.get("iceServers", []) - for s in servers: - urls = s.get("urls", []) - turn_creds_saved["username"] = s.get( - "username" - ) - turn_creds_saved["credential"] = s.get( - "credential" - ) - for u in urls: - if u.startswith("turns:"): - continue - all_turn_urls_saved.append(u) - - log.info( - f"[{name}] Все TURN URL:" - f" {all_turn_urls_saved}" - ) - - if known_sfu_subnet: - chosen = _pick_turn_for_subnet( - known_sfu_subnet, - all_turn_urls_saved, - ) - else: - udp_urls = [ - u for u in all_turn_urls_saved - if "transport=udp" in u - ] - chosen = ( - udp_urls[0] - if udp_urls - else all_turn_urls_saved[0] - if all_turn_urls_saved - else None - ) - - if chosen: - ice_servers_config.append( - RTCIceServer( - urls=[chosen], - username=turn_creds_saved[ - "username" - ], - credential=turn_creds_saved[ - "credential" - ], - ) - ) - if chosen: - relay_subnet = ( - _relay_subnet_from_turn_url(chosen) - ) - log.info( - f"[{name}] Используется TURN:" - f" {chosen}" - f" (relay подсеть={relay_subnet})" - ) - - elif event == "media-out" and method == "rtc:join": - join_data = payload.get("join", {}) - log.info( - f"[{name}] rtc:join получен" - f" (LiveKit {join_data.get('serverVersion')})" - ) - - elif ( - event == "media-out" - and method == "rtc:offer" - and pc_sub is None - ): - desc = payload.get("description", {}) - sdp = desc.get("sdp", "") - sdp_type = desc.get("type", "offer") - - m_lines = [ - ln for ln in sdp.splitlines() - if ln.startswith("m=") - ] - bundle_lines = [ - ln for ln in sdp.splitlines() - if "BUNDLE" in ln - ] - log.info( - f"[{name}] SFU offer:" - f" {len(m_lines)} m-lines:" - f" {m_lines}" - ) - log.info( - f"[{name}] SFU BUNDLE:" - f" {bundle_lines}" - ) - - sdp_file = ( - f"research/{name.lower()}" - "_sub_offer.sdp" - ) - with open(sdp_file, "w") as f: - f.write(sdp) - log.info( - f"[{name}] SDP сохранён в {sdp_file}" - ) - - rtc_config = RTCConfiguration( - iceServers=ice_servers_config, - bundlePolicy=RTCBundlePolicy.MAX_BUNDLE, - ) - pc_sub = RTCPeerConnection( - configuration=rtc_config - ) - - @pc_sub.on("connectionstatechange") - def on_sub_state() -> None: - state = pc_sub.connectionState - log.info( - f"[{name}] Subscriber PC: {state}" - ) - if state == "connected": - sub_ready.set() - - @pc_sub.on("datachannel") - def on_sub_dc(channel: object) -> None: - log.info( - f"[{name}] Subscriber DC:" - f" {channel.label}" - ) - if channel.label != "_reliable": - return - - @channel.on("message") - def on_sub_msg(message: object) -> None: - if isinstance(message, str): - raw = message.encode() - else: - raw = bytes(message) - - parsed = decode_data_packet(raw) - if parsed is None: - log.debug( - f"[{name}] Не DataPacket:" - f" {raw[:40]!r}" - ) - return - - payload, topic = parsed - if topic != "poc": - return - - text = payload.decode( - errors="replace" - ) - stats["received"] += 1 - stats["messages"].append( - ("received", text, time.time()) - ) - log.info( - f"[{name}] Получено:" - f" {text!r}" - ) - - if ( - is_server - and dc_pub is not None - ): - resp = f"Echo: {text}" - pkt = encode_data_packet( - resp.encode(), topic="poc" - ) + if ev == "join-response": group_id = p.get("participantGroup", {}).get("groupId") + elif ev == "media-out" and m == "rtc:config": + for s in p.get("configuration", {}).get("iceServers", []): + urls = [u for u in s.get("urls", []) if "transport=udp" in u] + if urls: ice_servers.append(RTCIceServer(urls=[urls[0]], username=s.get("username"), credential=s.get("credential"))) + + elif ev == "media-out" and m == "rtc:offer" and not peer["pc_sub"]: + peer["pc_sub"] = RTCPeerConnection(configuration=RTCConfiguration(iceServers=ice_servers, bundlePolicy=RTCBundlePolicy.MAX_BUNDLE)) + @peer["pc_sub"].on("connectionstatechange") + def _(): + if peer["pc_sub"].connectionState == "connected": peer["sub_ready"].set() + + @peer["pc_sub"].on("datachannel") + def on_dc(ch): + if ch.label != "_reliable": return + @ch.on("message") + def on_msg(msg_data): + parsed = decode_data_packet(msg_data if isinstance(msg_data, bytes) else msg_data.encode()) + if not parsed or parsed[1] != "poc": return + stats["recv"] += 1 + if is_server and peer["dc"]: try: - dc_pub.send(pkt) + peer["dc"].send(encode_data_packet(f"Echo: {parsed[0].decode()}".encode(), "poc")) stats["sent"] += 1 - stats["messages"].append( - ( - "sent", - resp, - time.time(), - ) - ) - except Exception: - log.exception( - f"[{name}]" - " Ошибка отправки echo" - ) - - @pc_sub.on("icecandidate") - async def on_sub_ice( - event_obj: object, - ) -> None: - if not ( - event_obj - and event_obj.candidate - and group_id - ): - return - c = event_obj.candidate - await ws.send_json({ - "roomId": room_id, - "event": "media-in", - "groupId": group_id, - "requestId": gen_uuid(), - "payload": { - "method": "rtc:ice", - "rtcIceCandidates": [{ - "candidate": c.candidate, - "sdpMid": c.sdpMid, - "sdpMLineIndex": ( - c.sdpMLineIndex - ), - "usernameFragment": "", - "target": "SUBSCRIBER", - }], - }, - }) - - await pc_sub.setRemoteDescription( - RTCSessionDescription( - sdp=sdp, type=sdp_type - ) - ) - answer = await pc_sub.createAnswer() - await pc_sub.setLocalDescription(answer) - - ans_m = [ - ln for ln in answer.sdp.splitlines() - if ln.startswith("m=") - ] - ans_bundle = [ - ln for ln in answer.sdp.splitlines() - if "BUNDLE" in ln - ] - log.info( - f"[{name}] Our answer:" - f" {len(ans_m)} m-lines:" - f" {ans_m}" - ) - log.info( - f"[{name}] Answer BUNDLE:" - f" {ans_bundle}" - ) - - ans_file = ( - f"research/{name.lower()}" - "_sub_answer.sdp" - ) - with open(ans_file, "w") as f: - f.write(answer.sdp) - - await ws.send_json( - { - "roomId": room_id, - "event": "media-in", - "groupId": group_id, - "requestId": gen_uuid(), - "payload": { - "method": "rtc:answer", - "description": { - "type": "answer", - "sdp": pc_sub.localDescription.sdp, - }, - }, - } - ) - log.info( - f"[{name}] Subscriber SDP answer отправлен" - ) - - for c in pending_ice_sub: - ice = parse_ice_candidate(c) - if ice: - await pc_sub.addIceCandidate(ice) - pending_ice_sub.clear() - + except: pass + + @peer["pc_sub"].on("icecandidate") + async def on_sub_ice(e): + if e and e.candidate and group_id: + await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:ice", "rtcIceCandidates": [{"candidate": e.candidate.candidate, "sdpMid": e.candidate.sdpMid, "sdpMLineIndex": e.candidate.sdpMLineIndex, "usernameFragment": "", "target": "SUBSCRIBER"}]}}) + + await peer["pc_sub"].setRemoteDescription(RTCSessionDescription(sdp=p["description"]["sdp"], type="offer")) + ans = await peer["pc_sub"].createAnswer() + await peer["pc_sub"].setLocalDescription(ans) + await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:answer", "description": {"type": "answer", "sdp": peer["pc_sub"].localDescription.sdp}}}) + for c in p_ice_sub: + pts = c.get("candidate","").split() + if len(pts) >= 8: await peer["pc_sub"].addIceCandidate(RTCIceCandidate(int(pts[1]), pts[0].split(":")[1], pts[4], int(pts[5]), int(pts[3]), pts[2], pts[7], str(c.get("sdpMid", "0")), c.get("sdpMLineIndex", 0))) + p_ice_sub.clear() await asyncio.sleep(0.3) + + # pub + peer["pc_pub"] = RTCPeerConnection(configuration=RTCConfiguration(iceServers=ice_servers, bundlePolicy=RTCBundlePolicy.MAX_BUNDLE)) + peer["pc_pub"].addTrack(AudioStreamTrack()) + peer["dc"] = peer["pc_pub"].createDataChannel("_reliable", ordered=True) + + @peer["dc"].on("open") + def on_open(): peer["ready"].set() + + @peer["dc"].on("message") + def on_pub_msg(msg_data): + parsed = decode_data_packet(msg_data if isinstance(msg_data, bytes) else msg_data.encode()) + if parsed and parsed[1] == "poc": stats["recv"] += 1 + + @peer["pc_pub"].on("icecandidate") + async def on_pub_ice(e): + if e and e.candidate and group_id: + await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:ice", "rtcIceCandidates": [{"candidate": e.candidate.candidate, "sdpMid": e.candidate.sdpMid, "sdpMLineIndex": e.candidate.sdpMLineIndex, "usernameFragment": "", "target": "PUBLISHER"}]}}) + + await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:track:add", "cid": str(uuid.uuid4()), "track": {"type": "AUDIO", "source": "MICROPHONE", "muted": True}}}) + pub_offer = await peer["pc_pub"].createOffer() + await peer["pc_pub"].setLocalDescription(pub_offer) + await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:offer", "description": {"type": "offer", "sdp": peer["pc_pub"].localDescription.sdp}}}) - # --- publisher --- - if not publisher_offer_sent: - pc_pub = RTCPeerConnection( - configuration=RTCConfiguration( - iceServers=ice_servers_config, - bundlePolicy=RTCBundlePolicy.MAX_BUNDLE, - ) - ) + elif ev == "media-out" and m == "rtc:answer" and peer["pc_pub"]: + await peer["pc_pub"].setRemoteDescription(RTCSessionDescription(sdp=p["description"]["sdp"], type="answer")) + for c in p_ice_pub: + pts = c.get("candidate","").split() + if len(pts) >= 8: await peer["pc_pub"].addIceCandidate(RTCIceCandidate(int(pts[1]), pts[0].split(":")[1], pts[4], int(pts[5]), int(pts[3]), pts[2], pts[7], str(c.get("sdpMid", "0")), c.get("sdpMLineIndex", 0))) + p_ice_pub.clear() + + elif ev == "media-out" and m == "rtc:ice": + for c in p.get("rtcIceCandidates", []): + pts = c.get("candidate","").split() + if len(pts) < 8: continue + ice = RTCIceCandidate(int(pts[1]), pts[0].split(":")[1], pts[4], int(pts[5]), int(pts[3]), pts[2], pts[7], str(c.get("sdpMid", "0")), c.get("sdpMLineIndex", 0)) + tgt = c.get("target") + if tgt == "SUBSCRIBER": (await peer["pc_sub"].addIceCandidate(ice)) if peer["pc_sub"] else p_ice_sub.append(c) + elif tgt == "PUBLISHER": (await peer["pc_pub"].addIceCandidate(ice)) if peer["pc_pub"] else p_ice_pub.append(c) - @pc_pub.on("connectionstatechange") - def on_pub_state() -> None: - log.info( - f"[{name}] Publisher PC:" - f" {pc_pub.connectionState}" - ) - - audio_track = AudioStreamTrack() - pc_pub.addTrack(audio_track) - - dc_pub_local = pc_pub.createDataChannel( - "_reliable", ordered=True - ) - - @dc_pub_local.on("open") - def on_dc_open() -> None: - log.info( - f"[{name}] Publisher DC open" - ) - dc_ready.set() - - @dc_pub_local.on("message") - def on_dc_msg(message: object) -> None: - if isinstance(message, str): - raw = message.encode() - else: - raw = bytes(message) - parsed = decode_data_packet(raw) - if parsed is None: - return - payload, topic = parsed - if topic != "poc": - return - text = payload.decode( - errors="replace" - ) - stats["received"] += 1 - stats["messages"].append( - ("received", text, time.time()) - ) - log.info( - f"[{name}] DC pub получено:" - f" {text!r}" - ) - - dc_pub = dc_pub_local - - await ws.send_json( - { - "roomId": room_id, - "event": "media-in", - "groupId": group_id, - "requestId": gen_uuid(), - "payload": { - "method": "rtc:track:add", - "cid": gen_uuid(), - "track": { - "type": "AUDIO", - "source": "MICROPHONE", - "muted": True, - "disableDtx": False, - }, - }, - } - ) - log.info( - f"[{name}] rtc:track:add отправлен" - ) - - offer = await pc_pub.createOffer() - await pc_pub.setLocalDescription(offer) - - pub_m = [ - ln for ln in offer.sdp.splitlines() - if ln.startswith("m=") - ] - log.info( - f"[{name}] Pub offer:" - f" {len(pub_m)} m-lines:" - f" {pub_m}" - ) - with open( - f"research/{name.lower()}" - "_pub_offer.sdp", "w" - ) as f: - f.write(offer.sdp) - - await ws.send_json( - { - "roomId": room_id, - "event": "media-in", - "groupId": group_id, - "requestId": gen_uuid(), - "payload": { - "method": "rtc:offer", - "description": { - "type": "offer", - "sdp": pc_pub.localDescription.sdp, - }, - }, - } - ) - log.info( - f"[{name}] Publisher SDP offer" - " отправлен" - ) - - @pc_pub.on("icecandidate") - async def on_pub_ice( - event_obj: object, - ) -> None: - if not ( - event_obj - and event_obj.candidate - and group_id - ): - return - c = event_obj.candidate - await ws.send_json({ - "roomId": room_id, - "event": "media-in", - "groupId": group_id, - "requestId": gen_uuid(), - "payload": { - "method": "rtc:ice", - "rtcIceCandidates": [{ - "candidate": ( - c.candidate - ), - "sdpMid": c.sdpMid, - "sdpMLineIndex": ( - c.sdpMLineIndex - ), - "usernameFragment": "", - "target": "PUBLISHER", - }], - }, - }) - - publisher_offer_sent = True - - elif ( - event == "media-out" - and method == "rtc:answer" - ): - desc = payload.get("description", {}) - sdp = desc.get("sdp", "") - sdp_type = desc.get("type", "answer") - - if pc_pub is not None: - await pc_pub.setRemoteDescription( - RTCSessionDescription( - sdp=sdp, type=sdp_type - ) - ) - log.info( - f"[{name}] Publisher SDP answer" - " получен" - ) - - for c in pending_ice_pub: - ice = parse_ice_candidate(c) - if ice: - await pc_pub.addIceCandidate( - ice - ) - pending_ice_pub.clear() - else: - log.warning( - f"[{name}] rtc:answer получен," - " но pc_pub ещё не создан" - ) - - elif event == "media-out" and method == "rtc:ice": - candidates = payload.get( - "rtcIceCandidates", [] - ) - for c in candidates: - target = c.get("target", "") - ice = parse_ice_candidate(c) - if ice is None: - continue - log.info( - f"[{name}] ICE от SFU:" - f" {target}" - f" {c.get('candidate', '')[:60]}" - ) - - if ( - target == "SUBSCRIBER" - and sfu_ip is None - and ice.ip - ): - sfu_ip = ice.ip - sfu_sub = _subnet_prefix(sfu_ip) - subnet_ok = ( - sfu_sub == relay_subnet - if relay_subnet - else None - ) - log.info( - f"[{name}] SFU IP={sfu_ip}" - f" (подсеть {sfu_sub})," - f" relay подсеть" - f"={relay_subnet}," - f" совпадение={subnet_ok}" - ) - subnet_checked.set() - if subnet_ok is False: - log.warning( - f"[{name}] MISMATCH" - " подсетей! Нужен" - " reconnect." - ) - break - - if target == "SUBSCRIBER": - if pc_sub: - try: - await pc_sub.addIceCandidate( - ice - ) - except Exception as e: - log.warning( - f"[{name}] Sub ICE" - f" ошибка: {e}" - ) - else: - pending_ice_sub.append(c) - elif target == "PUBLISHER": - if pc_pub: - try: - await pc_pub.addIceCandidate( - ice - ) - except Exception as e: - log.warning( - f"[{name}] Pub ICE" - f" ошибка: {e}" - ) - else: - pending_ice_pub.append(c) - - elif ( - event == "media-out" - and method == "rtc:track:published" - ): - log.info( - f"[{name}] Трек опубликован" - ) - - elif ( - event == "media-out" - and method == "rtc:pong" - ): - pass - - elif ( - event == "media-out" - and method == "rtc:participants:update" - ): - pass - - elif ( - event == "media-out" - and method == "rtc:quality" - ): - pass - - elif event in ( - "hand-statuses", - "get-chat-messages-response", - ): - pass - - elif event == "error": - code = payload.get("code", "") - message = payload.get("message", "") - log.warning( - f"[{name}] Ошибка: {code} — {message}" - ) - - else: - log.debug( - f"[{name}] Необработано:" - f" event={event} method={method}" - ) - - elif msg.type in ( - aiohttp.WSMsgType.CLOSED, - aiohttp.WSMsgType.ERROR, - ): - log.warning(f"[{name}] WS закрыт/ошибка") - break - - ws_task = asyncio.create_task(ws_loop()) - - # --- keepalive --- - async def keepalive() -> None: - rtt = 0 + async def _keep(): while not ws.closed: await asyncio.sleep(5) - if group_id and not ws.closed: - ts = int(time.time() * 1000) - await ws.send_json( - { - "roomId": room_id, - "event": "media-in", - "groupId": group_id, - "requestId": gen_uuid(), - "payload": { - "method": "rtc:ping", - "ping_req": { - "timestamp": ts, - "rtt": rtt, - }, - }, - } - ) + if group_id: await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:ping", "ping_req": {"timestamp": int(time.time()*1000), "rtt": 0}}}) - keepalive_task = asyncio.create_task(keepalive()) + peer["task"] = asyncio.create_task(ws_loop()) + peer["keep"] = asyncio.create_task(_keep()) + return peer - return { - "name": name, - "dc_pub": lambda: dc_pub, - "dc_ready": dc_ready, - "sub_ready": sub_ready, - "subnet_checked": subnet_checked, - "subnet_ok": lambda: subnet_ok, - "sfu_subnet": lambda: ( - _subnet_prefix(sfu_ip) if sfu_ip else "" - ), - "all_turn_urls": all_turn_urls_saved, - "stats": stats, - "ws": ws, - "ws_task": ws_task, - "keepalive_task": keepalive_task, - "pc_sub": lambda: pc_sub, - "pc_pub": lambda: pc_pub, - } - - -async def cleanup_peer(peer: dict[str, object]) -> None: - """Корректно закрывает ресурсы пира.""" - for key in ("ws_task", "keepalive_task"): - task = peer.get(key) - if task and isinstance(task, asyncio.Task): - task.cancel() - ws = peer.get("ws") - if ws: - try: - await ws.close() # type: ignore[union-attr] - except Exception: - pass - for getter_key in ("pc_sub", "pc_pub"): - getter = peer.get(getter_key) - if callable(getter): - pc = getter() - if pc: - try: - await pc.close() - except Exception: - pass - - -MAX_RECONNECTS = 2 - - -async def run_poc() -> None: - """Основной сценарий PoC.""" - log.info("Jazz DataChannel PoC") - log.info("DataChannel через SaluteJazz SFU (LiveKit)") - - peers: list[dict[str, object]] = [] - try: - success = await _run_attempt(peers) - finally: - for p in peers: - await cleanup_peer(p) - - if not success: - log.error( - "Подключение не удалось." - " Возможно, проблема с сетью/TURN." - ) - - -async def _run_attempt( - peers: list[dict[str, object]], -) -> bool: - """Попытка подключения с быстрым reconnect при mismatch.""" +async def run_poc() -> dict: + print("\n--- SaluteJazz PoC ---") + results = {"server_ok": False, "client_ok": False, "sent": 0, "recv": 0, "errors": []} + s_stats, c_stats = {"sent": 0, "recv": 0}, {"sent": 0, "recv": 0} + async with aiohttp.ClientSession() as session: - log.info("[1/3] Создание комнаты...") try: - room = await create_room(session) - log.info( - f"Комната создана:" - f" roomId={room['roomId']}," - f" password={room['password']}" - ) + r = await session.post(f"{API_BASE}/room/create-meeting", headers=JAZZ_HEADERS, json={"title": "PoC", "guestEnabled": True, "lobbyEnabled": False}) + rj = await r.json() + r2 = await session.post(f"{API_BASE}/room/{rj['roomId']}/preconnect", headers=JAZZ_HEADERS, json={"password": rj["password"], "jazzNextMigration": {"b2bBaseRoomSupport": True, "demoRoomBaseSupport": True, "demoRoomVersionSupport": 2, "mediaWithoutAutoSubscribeSupport": True}}) + room_inf = {"roomId": rj["roomId"], "password": rj["password"], "connectorUrl": (await r2.json())["connectorUrl"]} except Exception as e: - log.error(f"Ошибка создания комнаты: {e}") - return False + results["errors"].append(f"Auth fail: {e}") + return results - known_subnet = "" - - for reconnect_i in range(MAX_RECONNECTS + 1): - phase = "Фаза 1" if not known_subnet else "Фаза 2" - if reconnect_i > 0: - log.info( - f"--- {phase}: reconnect" - f" #{reconnect_i}" - f" (подсеть SFU={known_subnet}) ---" - ) - - ok = await _connect_peers( - room, session, peers, - known_sfu_subnet=known_subnet, - ) - if ok: - return True - - last = peers[-1] if peers else None - if ( - last - and callable(last.get("subnet_ok")) - and last["subnet_ok"]() is False - ): - sfu_sub_fn = last.get("sfu_subnet") - discovered = ( - sfu_sub_fn() - if callable(sfu_sub_fn) - else "" - ) - if discovered and discovered != known_subnet: - known_subnet = discovered - log.info( - f"Обнаружена подсеть SFU:" - f" {known_subnet}, reconnect" - ) - for p in peers: - await cleanup_peer(p) - peers.clear() - continue - - return False - - return False - - -async def _connect_peers( - room: dict[str, str], - session: aiohttp.ClientSession, - peers: list[dict[str, object]], - known_sfu_subnet: str = "", -) -> bool: - """Подключение обоих пиров и обмен сообщениями.""" - log.info("[2/3] Подключение пиров...") - - log.info("Подключение Server...") - try: - server = await create_peer( - "Server", room, session, - is_server=True, - known_sfu_subnet=known_sfu_subnet, - ) - peers.append(server) - - done, _ = await asyncio.wait( - [ - asyncio.ensure_future( - server["dc_ready"].wait() - ), - asyncio.ensure_future( - server["subnet_checked"].wait() - ), - ], - timeout=30.0, - return_when=asyncio.FIRST_COMPLETED, - ) - - if server["subnet_checked"].is_set(): - if ( - callable(server.get("subnet_ok")) - and server["subnet_ok"]() is False - ): - log.warning( - "MISMATCH подсетей" - " — быстрый reconnect" - ) - return False - - if not server["dc_ready"].is_set(): - await asyncio.wait_for( - server["dc_ready"].wait(), timeout=15.0 - ) - log.info("Server: DataChannel open") - except asyncio.TimeoutError: - log.error( - "Server DataChannel timeout" - " (TURN/ICE не прошёл)" - ) - return False - except Exception as e: - log.error(f"Ошибка Server: {e}") - return False - - sfu_sub_fn = server.get("sfu_subnet") - server_sfu_subnet = ( - sfu_sub_fn() - if callable(sfu_sub_fn) - else "" - ) - - log.info("Подключение Client...") - try: - client = await create_peer( - "Client", room, session, - is_server=False, - known_sfu_subnet=( - server_sfu_subnet or known_sfu_subnet - ), - ) - peers.append(client) - await asyncio.wait_for( - client["dc_ready"].wait(), timeout=15.0 - ) - log.info("Client: DataChannel open") - except asyncio.TimeoutError: - log.error("Client DataChannel timeout") - return False - except Exception as e: - log.error(f"Ошибка Client: {e}") - return False - - log.info("Ожидание subscriber connected...") - try: - await asyncio.wait_for( - server["sub_ready"].wait(), timeout=30.0 - ) - log.info("Server subscriber: connected") - except asyncio.TimeoutError: - log.warning( - "Server subscriber" - " не подключился (timeout 30s)" - ) - - try: - await asyncio.wait_for( - client["sub_ready"].wait(), timeout=15.0 - ) - log.info("Client subscriber: connected") - except asyncio.TimeoutError: - log.warning( - "Client subscriber" - " не подключился (timeout 15s)" - ) - - log.info("[3/3] Обмен сообщениями...") - await asyncio.sleep(2) - - test_messages = [ - "Hello Jazz DC!", - "Тестовое сообщение на русском", - "X" * 100, - "Final test", - ] - - dc = client["dc_pub"]() - if dc is None: - log.error("DataChannel клиента не создан") - return False - - pc_pub_client = client["pc_pub"]() - if ( - pc_pub_client is None - or pc_pub_client.connectionState - in ("closed", "failed") - ): - log.error( - f"Publisher PC клиента уже" - f" {pc_pub_client and pc_pub_client.connectionState}" - ) - return False - - for i, text in enumerate(test_messages, 1): - pkt = encode_data_packet( - text.encode(), topic="poc" - ) + print("[1/3] Connecting Server & Client...") try: - dc.send(pkt) + server = await _create_peer("Server", room_inf, session, is_server=True, stats=s_stats) + await asyncio.wait_for(server["ready"].wait(), 15.0) + results["server_ok"] = True + + client = await _create_peer("Client", room_inf, session, is_server=False, stats=c_stats) + await asyncio.wait_for(client["ready"].wait(), 15.0) + results["client_ok"] = True + print(" :P Peers connected") except Exception as e: - log.error(f"Ошибка отправки: {e}") - return False - client["stats"]["sent"] += 1 - log.info( - f"Отправлено {i}/{len(test_messages)}" - f" ({len(text)}b → {len(pkt)}b protobuf)" - ) - await asyncio.sleep(0.5) + results["errors"].append(str(e)) + return results - log.info("Ожидание ответов (10с)...") - await asyncio.sleep(10) + print("\n[2/3] Exchanging messages...") + await asyncio.sleep(1) + for idx, msg in enumerate(TEST_MESSAGES, 1): + try: + client["dc"].send(encode_data_packet(msg.encode(), "poc")) + c_stats["sent"] += 1 + print(f" -> Sent: {msg}") + await asyncio.sleep(0.5) + except Exception as e: + results["errors"].append(f"Sending {idx} failed: {str(e)}") - received = client["stats"]["received"] - sent = client["stats"]["sent"] + await asyncio.sleep(3) + results["sent"], results["recv"] = c_stats["sent"], c_stats["recv"] + + print("\n[3/3] Cleaning up...") + for p in (server, client): + for t in ["task", "keep"]: p[t].cancel() + await p["ws"].close() + for pc in [p["pc_sub"], p["pc_pub"]]: + if pc: await pc.close() - log.info(f"Отправлено: {sent}") - log.info(f"Получено ответов: {received}") - - if received > 0: - log.info("ТЕСТ ПРОЙДЕН — DataChannel работает!") - return True - - log.error("ТЕСТ НЕ ПРОЙДЕН — ответы не получены") - srv_recv = server["stats"]["received"] - log.info(f"Server получил: {srv_recv} сообщений") - return False + return results +def print_results(res: dict): + print("\n--- TEST RESULTS ---") + print(f"Server: {':P' if res['server_ok'] else 'X'} / Client: {':P' if res['client_ok'] else 'X'}") + print(f"Messages: Sent {res['sent']} / Recv {res['recv']}") + if res['errors']: + for e in res['errors']: print(f" Error: {e}") + print(f"\n{':P SUCCESS' if res['sent'] and res['sent'] == res['recv'] else 'X FAILED'}\n") if __name__ == "__main__": - try: - asyncio.run(run_poc()) - except KeyboardInterrupt: - log.info("Прервано.") + try: res = asyncio.run(run_poc()); print_results(res) + except KeyboardInterrupt: pass diff --git a/code/telemost_poc.py b/code/telemost_poc.py index 0eed8c1..6b65d60 100755 --- a/code/telemost_poc.py +++ b/code/telemost_poc.py @@ -1,366 +1,168 @@ #!/usr/bin/env python3 +"""PoC: Yandex Telemost DataChannel via Websocket and AIORTC.""" import asyncio import json import uuid import time -import websockets import requests +import websockets from urllib.parse import quote from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceCandidate, RTCConfiguration, RTCIceServer CONFERENCE_ID = "75047680642749" -CONFERENCE_URL = f"https://telemost.yandex.ru/j/{CONFERENCE_ID}" API_BASE = "https://cloud-api.yandex.ru/telemost_front/v2/telemost" +ICE_SERVER = RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"]) +TEST_MESSAGES = ["Hello Yandex Telemost!", "Hello world", "X" * 100, "Final test"] -def generate_uuid(): - return str(uuid.uuid4()) +def _gen_uuid() -> str: return str(uuid.uuid4()) -def get_connection_info(display_name): - url = f"{API_BASE}/conferences/{quote(CONFERENCE_URL, safe='')}/connection" - params = { - "next_gen_media_platform_allowed": "true", - "display_name": display_name, - "waiting_room_supported": "true" - } - +def _get_conn_info(display_name: str) -> dict: + url = f"{API_BASE}/conferences/{quote(f'https://telemost.yandex.ru/j/{CONFERENCE_ID}', safe='')}/connection" headers = { - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0", - "Accept": "*/*", - "content-type": "application/json", - "Client-Instance-Id": generate_uuid(), + "User-Agent": "Mozilla/5.0 (Linux x86_64)", + "Client-Instance-Id": _gen_uuid(), "X-Telemost-Client-Version": "187.1.0", - "idempotency-key": generate_uuid(), - "Origin": "https://telemost.yandex.ru", - "Referer": "https://telemost.yandex.ru/" + "idempotency-key": _gen_uuid(), } - - response = requests.get(url, params=params, headers=headers) - response.raise_for_status() - return response.json() + params = {"next_gen_media_platform_allowed": "true", "display_name": display_name, "waiting_room_supported": "true"} + resp = requests.get(url, params=params, headers=headers) + resp.raise_for_status() + return resp.json() -async def create_peer(name, is_server=False): - conn_info = get_connection_info(name) - room_id = conn_info["room_id"] - peer_id = conn_info["peer_id"] - credentials = conn_info["credentials"] - ws_url = conn_info["client_configuration"]["media_server_url"] - - pc_sub = RTCPeerConnection(RTCConfiguration( - iceServers=[RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"])] - )) - - pc_pub = RTCPeerConnection(RTCConfiguration( - iceServers=[RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"])] - )) - +async def _create_peer(name: str, is_server: bool = False, stats: dict = None) -> dict: + info = _get_conn_info(name) + ws = await websockets.connect(info["client_configuration"]["media_server_url"]) + pc_sub = RTCPeerConnection(RTCConfiguration(iceServers=[ICE_SERVER])) + pc_pub = RTCPeerConnection(RTCConfiguration(iceServers=[ICE_SERVER])) dc_pub = pc_pub.createDataChannel("olcrtc", ordered=True) - dc_pub_open = asyncio.Event() - - stats = { - "sent": 0, - "received": 0, - "bytes_sent": 0, - "bytes_received": 0, - "messages": [] - } - + dc_ready = asyncio.Event() + @dc_pub.on("open") - def on_pub_open(): - dc_pub_open.set() + def on_open(): dc_ready.set() @dc_pub.on("message") - def on_pub_msg(msg): - stats["received"] += 1 - stats["bytes_received"] += len(msg) - stats["messages"].append(("received", msg, time.time())) - + def on_pub_msg(msg): stats["recv"] += 1 + @pc_sub.on("datachannel") def on_sub_dc(channel): @channel.on("message") - def on_message(message): - stats["received"] += 1 - stats["bytes_received"] += len(message) - stats["messages"].append(("received", message, time.time())) - + def on_msg(m): + stats["recv"] += 1 if is_server and channel.label == "olcrtc": - response = f"Echo: {message}" try: - dc_pub.send(response) + dc_pub.send(f"Echo: {m}") stats["sent"] += 1 - stats["bytes_sent"] += len(response) - stats["messages"].append(("sent", response, time.time())) - except: - pass - - ws = await websockets.connect(ws_url) - - hello_msg = { - "uid": generate_uuid(), + except: pass + + await ws.send(json.dumps({ + "uid": _gen_uuid(), "hello": { "participantMeta": {"name": name, "role": "SPEAKER", "sendAudio": False, "sendVideo": False}, "participantAttributes": {"name": name, "role": "SPEAKER"}, - "sendAudio": False, - "sendVideo": False, - "sendSharing": False, - "participantId": peer_id, - "roomId": room_id, - "serviceName": "telemost", - "credentials": credentials, - "capabilitiesOffer": { - "offerAnswerMode": ["SEPARATE"], - "initialSubscriberOffer": ["ON_HELLO"], - "slotsMode": ["FROM_CONTROLLER"], - "simulcastMode": ["DISABLED"], - "selfVadStatus": ["FROM_SERVER"], - "dataChannelSharing": ["TO_RTP"] - }, + "sendAudio": False, "sendVideo": False, "sendSharing": False, + "participantId": info["peer_id"], "roomId": info["room_id"], + "serviceName": "telemost", "credentials": info["credentials"], + "capabilitiesOffer": {"offerAnswerMode": ["SEPARATE"], "initialSubscriberOffer": ["ON_HELLO"], "slotsMode": ["FROM_CONTROLLER"], "simulcastMode": ["DISABLED"], "selfVadStatus": ["FROM_SERVER"], "dataChannelSharing": ["TO_RTP"]}, "sdkInfo": {"implementation": "python", "version": "1.0.0", "userAgent": f"OlcRTC-{name}"}, - "sdkInitializationId": generate_uuid(), - "disablePublisher": False, - "disableSubscriber": False + "sdkInitializationId": _gen_uuid(), "disablePublisher": False, "disableSubscriber": False } - } - - await ws.send(json.dumps(hello_msg)) - - publisher_sdp_sent = False - - async def ws_handler(): - nonlocal publisher_sdp_sent + })) + + async def _ws_loop(): + pub_sdp_sent = False while True: try: data = json.loads(await ws.recv()) + uid = data.get("uid") if "serverHello" in data: - await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) - - if "subscriberSdpOffer" in data and not publisher_sdp_sent: - await pc_sub.setRemoteDescription(RTCSessionDescription( - sdp=data["subscriberSdpOffer"]["sdp"], type="offer" - )) + await ws.send(json.dumps({"uid": uid, "ack": {"status": {"code": "OK"}}})) - answer = await pc_sub.createAnswer() - await pc_sub.setLocalDescription(answer) - - await ws.send(json.dumps({ - "uid": generate_uuid(), - "subscriberSdpAnswer": { - "pcSeq": data["subscriberSdpOffer"]["pcSeq"], - "sdp": pc_sub.localDescription.sdp - } - })) - - await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) + elif "subscriberSdpOffer" in data and not pub_sdp_sent: + sdp = data["subscriberSdpOffer"] + await pc_sub.setRemoteDescription(RTCSessionDescription(sdp=sdp["sdp"], type="offer")) + ans = await pc_sub.createAnswer() + await pc_sub.setLocalDescription(ans) + await ws.send(json.dumps({"uid": _gen_uuid(), "subscriberSdpAnswer": {"pcSeq": sdp["pcSeq"], "sdp": pc_sub.localDescription.sdp}})) + await ws.send(json.dumps({"uid": uid, "ack": {"status": {"code": "OK"}}})) await asyncio.sleep(0.3) - pub_offer = await pc_pub.createOffer() await pc_pub.setLocalDescription(pub_offer) + await ws.send(json.dumps({"uid": _gen_uuid(), "publisherSdpOffer": {"pcSeq": 1, "sdp": pc_pub.localDescription.sdp}})) + pub_sdp_sent = True - await ws.send(json.dumps({ - "uid": generate_uuid(), - "publisherSdpOffer": { - "pcSeq": 1, - "sdp": pc_pub.localDescription.sdp - } - })) - publisher_sdp_sent = True - - if "publisherSdpAnswer" in data: - await pc_pub.setRemoteDescription(RTCSessionDescription( - sdp=data["publisherSdpAnswer"]["sdp"], type="answer" - )) - await ws.send(json.dumps({"uid": data["uid"], "ack": {"status": {"code": "OK"}}})) - - if "webrtcIceCandidate" in data: + elif "publisherSdpAnswer" in data: + await pc_pub.setRemoteDescription(RTCSessionDescription(sdp=data["publisherSdpAnswer"]["sdp"], type="answer")) + await ws.send(json.dumps({"uid": uid, "ack": {"status": {"code": "OK"}}})) + + elif "webrtcIceCandidate" in data: cand = data["webrtcIceCandidate"] - try: - parts = cand["candidate"].split() - if len(parts) >= 8: - ice = RTCIceCandidate( - component=int(parts[1]), - foundation=parts[0].replace("candidate:", ""), - ip=parts[4], - port=int(parts[5]), - priority=int(parts[3]), - protocol=parts[2], - type=parts[7], - sdpMid=cand["sdpMid"], - sdpMLineIndex=cand["sdpMlineIndex"] - ) - - if cand.get("target") == "SUBSCRIBER": - await pc_sub.addIceCandidate(ice) - elif cand.get("target") == "PUBLISHER": - await pc_pub.addIceCandidate(ice) - except: - pass - - except: - break - - @pc_sub.on("icecandidate") - async def on_sub_ice(event): - if event.candidate: - await ws.send(json.dumps({ - "uid": generate_uuid(), - "webrtcIceCandidate": { - "candidate": event.candidate.candidate, - "sdpMid": event.candidate.sdpMid, - "sdpMlineIndex": event.candidate.sdpMLineIndex, - "target": "SUBSCRIBER", - "pcSeq": 1 - } - })) - - @pc_pub.on("icecandidate") - async def on_pub_ice(event): - if event.candidate: - await ws.send(json.dumps({ - "uid": generate_uuid(), - "webrtcIceCandidate": { - "candidate": event.candidate.candidate, - "sdpMid": event.candidate.sdpMid, - "sdpMlineIndex": event.candidate.sdpMLineIndex, - "target": "PUBLISHER", - "pcSeq": 1 - } - })) - - ws_task = asyncio.create_task(ws_handler()) - - return { - "name": name, - "dc_pub": dc_pub, - "dc_pub_open": dc_pub_open, - "stats": stats, - "ws": ws, - "ws_task": ws_task, - "pc_sub": pc_sub, - "pc_pub": pc_pub - } + parts = cand["candidate"].split() + if len(parts) >= 8: + ice = RTCIceCandidate(component=int(parts[1]), foundation=parts[0].replace("candidate:", ""), ip=parts[4], port=int(parts[5]), priority=int(parts[3]), protocol=parts[2], type=parts[7], sdpMid=cand["sdpMid"], sdpMLineIndex=cand["sdpMlineIndex"]) + await (pc_sub if cand.get("target") == "SUBSCRIBER" else pc_pub).addIceCandidate(ice) + except Exception: break -async def run_full_test(): - print(r""" - OlcRTC - Full Test Suite - DataChannel over Yandex Telemost SFU - by zowue for olc -""") + async def _on_ice(event, target): + if event.candidate: + await ws.send(json.dumps({"uid": _gen_uuid(), "webrtcIceCandidate": {"candidate": event.candidate.candidate, "sdpMid": event.candidate.sdpMid, "sdpMlineIndex": event.candidate.sdpMLineIndex, "target": target, "pcSeq": 1}})) + + pc_sub.on("icecandidate", lambda e: asyncio.create_task(_on_ice(e, "SUBSCRIBER"))) + pc_pub.on("icecandidate", lambda e: asyncio.create_task(_on_ice(e, "PUBLISHER"))) - results = { - "server_connected": False, - "client_connected": False, - "messages_sent": 0, - "messages_received": 0, - "bytes_sent": 0, - "bytes_received": 0, - "latency_ms": [], - "errors": [] - } + return {"dc": dc_pub, "ready": dc_ready, "task": asyncio.create_task(_ws_loop()), "ws": ws, "pc_sub": pc_sub, "pc_pub": pc_pub} + +async def run_poc() -> dict: + print("\n--- Yandex Telemost PoC ---") + results = {"server_ok": False, "client_ok": False, "sent": 0, "recv": 0, "errors": []} + s_stats, c_stats = {"sent": 0, "recv": 0}, {"sent": 0, "recv": 0} - print("[1/4] Creating server peer...") + print("[1/3] Connecting Server & Client...") try: - server = await create_peer("Server", is_server=True) - await asyncio.wait_for(server["dc_pub_open"].wait(), timeout=10.0) - results["server_connected"] = True - print(" :P Server connected") + server = await _create_peer("Server", is_server=True, stats=s_stats) + await asyncio.wait_for(server["ready"].wait(), 10.0) + results["server_ok"] = True + + client = await _create_peer("Client", is_server=False, stats=c_stats) + await asyncio.wait_for(client["ready"].wait(), 10.0) + results["client_ok"] = True + print(" :P Peers connected") except Exception as e: - results["errors"].append(f"Server failed: {e}") - print(f" X Error: {e}") + results["errors"].append(str(e)) return results - - print("\n[2/4] Creating client peer...") - try: - client = await create_peer("Client", is_server=False) - await asyncio.wait_for(client["dc_pub_open"].wait(), timeout=10.0) - results["client_connected"] = True - print(" :P Client connected") - except Exception as e: - results["errors"].append(f"Client failed: {e}") - print(f" X Error: {e}") - return results - - print("\n[3/4] Testing message exchange...") - await asyncio.sleep(2) - - test_messages = [ - "Hello OlcRTC!", - "я всего лиш хотел дружить зачем тролякатся", - "X" * 100, - "Final test" - ] - - try: - for i, msg in enumerate(test_messages, 1): - send_time = time.time() - client["dc_pub"].send(msg) - client["stats"]["sent"] += 1 - client["stats"]["bytes_sent"] += len(msg) - print(f" -> Sent message {i}/{len(test_messages)} ({len(msg)}b)") + + print("\n[2/3] Exchanging messages...") + await asyncio.sleep(1) + for idx, msg in enumerate(TEST_MESSAGES, 1): + try: + client["dc"].send(msg) + c_stats["sent"] += 1 + print(f" -> Sent: {msg}") await asyncio.sleep(0.5) - - await asyncio.sleep(3) - - results["messages_sent"] = client["stats"]["sent"] - results["messages_received"] = client["stats"]["received"] - results["bytes_sent"] = client["stats"]["bytes_sent"] - results["bytes_received"] = client["stats"]["bytes_received"] - - print(f" :P Sent: {results['messages_sent']} messages") - print(f" :P Received: {results['messages_received']} responses") - - except Exception as e: - results["errors"].append(f"Exchange failed: {e}") - print(f" X Error: {e}") - - print("\n[4/4] Cleaning up...") - try: - server["ws_task"].cancel() - client["ws_task"].cancel() - await server["ws"].close() - await client["ws"].close() - await server["pc_sub"].close() - await server["pc_pub"].close() - await client["pc_sub"].close() - await client["pc_pub"].close() - print(" :P Cleanup complete") - except: - pass + except Exception as e: + results["errors"].append(f"Sending {idx} failed: {str(e)}") + + await asyncio.sleep(2) + results["sent"], results["recv"] = c_stats["sent"], c_stats["recv"] + print("\n[3/3] Cleaning up...") + for p in (server, client): + p["task"].cancel() + await p["ws"].close() + await p["pc_sub"].close() + await p["pc_pub"].close() + return results -def print_results(results): - print(r""" - TEST RESULTS -""") - - print("Connection Status:") - print(f" - Server: {':P Connected' if results['server_connected'] else 'X Failed'}") - print(f" - Client: {':P Connected' if results['client_connected'] else 'X Failed'}") - - print("\nMessage Exchange:") - print(f" - Sent: {results['messages_sent']} messages ({results['bytes_sent']} bytes)") - print(f" - Received: {results['messages_received']} messages ({results['bytes_received']} bytes)") - - success_rate = (results['messages_received'] / results['messages_sent'] * 100) if results['messages_sent'] > 0 else 0 - print(f" - Success Rate: {success_rate:.1f}%") - - if results['errors']: - print("\nErrors:") - for err in results['errors']: - print(f" - {err}") - - if results['messages_received'] > 0: - print("\n :P TEST PASSED - OlcRTC PoC works!") - else: - print("\n X TEST FAILED - Check errors above") - -async def main(): - results = await run_full_test() - print_results(results) +def print_results(res: dict): + print("\n--- TEST RESULTS ---") + print(f"Server: {':P' if res['server_ok'] else 'X'} / Client: {':P' if res['client_ok'] else 'X'}") + print(f"Messages: Sent {res['sent']} / Recv {res['recv']}") + if res['errors']: + for e in res['errors']: print(f" Error: {e}") + print(f"\n{':P SUCCESS' if res['sent'] and res['sent'] == res['recv'] else 'X FAILED'}\n") if __name__ == "__main__": - try: - asyncio.run(main()) - except KeyboardInterrupt: - print("\n\nTest interrupted.") + try: res = asyncio.run(run_poc()); print_results(res) + except KeyboardInterrupt: pass diff --git a/code/wbstream_poc.py b/code/wbstream_poc.py old mode 100644 new mode 100755 index 4ae9d9b..c024952 --- a/code/wbstream_poc.py +++ b/code/wbstream_poc.py @@ -1,218 +1,113 @@ #!/usr/bin/env python3 -"""PoC: передача произвольных данных через WB Stream SFU (LiveKit). - -В отличие от Яндекс Телемоста или SaluteJazz, WB Stream использует -стандартный немодифицированный протокол LiveKit (v16). Это означает, -что нам не нужно собирать кастомные WebSocket-обработчики или костыли -с Protobuf/JSON. Мы можем использовать официальную библиотеку livekit. -""" +"""PoC: WB Stream DataChannel over LiveKit.""" import asyncio import logging -import time import uuid import requests try: from livekit import rtc except ImportError: - print("\n[!] Ошибка: не установлена библиотека livekit.") - print("Выполните: pip install livekit requests\n") + print("[!] Error: livekit library not installed.\nRun: pip install livekit requests") exit(1) logging.getLogger("livekit").setLevel(logging.WARNING) API_BASE = "https://stream.wb.ru" WS_URL = "wss://wbstream01-el.wb.ru:7880" +TEST_MESSAGES = ["Hello WB Stream!", "Hello world", "X" * 100, "Final test"] -def generate_uuid(): - return str(uuid.uuid4()) - -def get_room_token(room_id: str, display_name: str) -> str: +def _get_room_token(room_id: str, display_name: str) -> tuple[str, str]: + """Retrieves the room token via the guest API.""" headers = { - "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0)", - "Accept": "application/json, text/plain, */*", + "User-Agent": "Mozilla/5.0 (Linux x86_64)", "Content-Type": "application/json" } - - # 0. Гостевая регистрация для получения Bearer токена - reg_resp = requests.post( + + # 1. Guest Registration + reg_req = requests.post( f"{API_BASE}/auth/api/v1/auth/user/guest-register", - json={"displayName": display_name, "device": {"deviceName": "Linux Device", "deviceType": "PARTICIPANT_DEVICE_TYPE_WEB_DESKTOP"}}, + json={"displayName": display_name, "device": {"deviceName": "Linux", "deviceType": "PARTICIPANT_DEVICE_TYPE_WEB_DESKTOP"}}, headers=headers ) - reg_resp.raise_for_status() - access_token = reg_resp.json()["accessToken"] - headers["Authorization"] = f"Bearer {access_token}" + reg_req.raise_for_status() + headers["Authorization"] = f"Bearer {reg_req.json()['accessToken']}" + # 2. Room Creation (if empty) if not room_id: - # 1. Создаем комнату (только для первого пира) - resp = requests.post( - f"{API_BASE}/api-room/api/v2/room", - json={"roomType": "ROOM_TYPE_ALL_ON_SCREEN", "roomPrivacy": "ROOM_PRIVACY_FREE"}, - headers=headers - ) - resp.raise_for_status() - room_id = resp.json()["roomId"] - - # 2. Джойнимся - resp = requests.post(f"{API_BASE}/api-room/api/v1/room/{room_id}/join", json={}, headers=headers) - resp.raise_for_status() - - # 3. Получаем токен - params = { - "deviceType": "PARTICIPANT_DEVICE_TYPE_WEB_DESKTOP", - "displayName": display_name - } - resp = requests.get( - f"{API_BASE}/api-room-manager/api/v1/room/{room_id}/token", - params=params, - headers=headers - ) - resp.raise_for_status() - - return room_id, resp.json()["roomToken"] + room_req = requests.post(f"{API_BASE}/api-room/api/v2/room", json={"roomType": "ROOM_TYPE_ALL_ON_SCREEN", "roomPrivacy": "ROOM_PRIVACY_FREE"}, headers=headers) + room_req.raise_for_status() + room_id = room_req.json()["roomId"] + + # 3. Join & Token + requests.post(f"{API_BASE}/api-room/api/v1/room/{room_id}/join", json={}, headers=headers).raise_for_status() + tok_req = requests.get(f"{API_BASE}/api-room-manager/api/v1/room/{room_id}/token", params={"deviceType": "PARTICIPANT_DEVICE_TYPE_WEB_DESKTOP", "displayName": display_name}, headers=headers) + tok_req.raise_for_status() + return room_id, tok_req.json()["roomToken"] -async def run_full_test(): - print(r""" - OlcRTC - Full Test Suite - DataChannel over WB Stream SFU (LiveKit) - by zowue for olc -""") +async def run_poc() -> dict: + """Runs the complete PoC flow.""" + print("\n--- WB Stream PoC ---") + results = {"server_ok": False, "client_ok": False, "sent": 0, "recv": 0, "errors": []} - results = { - "server_connected": False, - "client_connected": False, - "messages_sent": 0, - "messages_received": 0, - "bytes_sent": 0, - "bytes_received": 0, - "errors": [] - } - - print("[1/4] Creating server peer...") + server, client = rtc.Room(), rtc.Room() + shared_room_id, _ = _get_room_token("", "OlcRTC-Server") + + print("[1/3] Connecting Server & Client...") try: - shared_room_id, server_token = get_room_token("", "OlcRTC-Server") - - server_room = rtc.Room() - server_stats = {"sent": 0, "received": 0} - - @server_room.on("data_received") - def on_s_data(dp: rtc.DataPacket): - text = dp.data.decode('utf-8', errors='replace') - server_stats["received"] += 1 - if dp.topic == "olcrtc_poc": - resp_text = f"Echo: {text}" - resp_data = resp_text.encode('utf-8') - asyncio.create_task(server_room.local_participant.publish_data(resp_data, topic="olcrtc_poc")) - server_stats["sent"] += 1 + shared_room_id, server_tok = _get_room_token("", "OlcRTC-Server") + _, client_tok = _get_room_token(shared_room_id, "OlcRTC-Client") - await server_room.connect(WS_URL, server_token) - results["server_connected"] = True - print(f" :P Server connected (Room: {shared_room_id})") + @server.on("data_received") + def on_server_data(dp: rtc.DataPacket): + if dp.topic == "olcrtc": + asyncio.create_task(server.local_participant.publish_data(f"Echo: {dp.data.decode()}".encode(), topic="olcrtc")) + + @client.on("data_received") + def on_client_data(dp: rtc.DataPacket): + results["recv"] += 1 + + await server.connect(WS_URL, server_tok) + results["server_ok"] = True + await client.connect(WS_URL, client_tok) + results["client_ok"] = True + print(f" :P Peers connected to room: {shared_room_id}") except Exception as e: - results["errors"].append(f"Server failed: {e}") - print(f" X Error: {e}") + results["errors"].append(str(e)) return results - - print("\n[2/4] Creating client peer...") - try: - _, client_token = get_room_token(shared_room_id, "OlcRTC-Client") - - client_room = rtc.Room() - client_stats = {"sent": 0, "received": 0, "bytes_sent": 0, "bytes_received": 0} - @client_room.on("data_received") - def on_c_data(dp: rtc.DataPacket): - text = dp.data.decode('utf-8', errors='replace') - client_stats["received"] += 1 - client_stats["bytes_received"] += len(dp.data) - - await client_room.connect(WS_URL, client_token) - results["client_connected"] = True - print(" :P Client connected") - except Exception as e: - results["errors"].append(f"Client failed: {e}") - print(f" X Error: {e}") - return results + print("\n[2/3] Exchanging messages...") + await asyncio.sleep(1) - print("\n[3/4] Testing message exchange...") - await asyncio.sleep(2) # Даем время LiveKit поднять WebRTC транспорт - - test_messages = [ - "Hello WB Stream!", - "я всего лиш хотел дружить зачем тролякатся", - "X" * 100, - "Final test" - ] - - try: - for i, msg in enumerate(test_messages, 1): - msg_bytes = msg.encode('utf-8') - # Отправляем DataPacket через LiveKit - await client_room.local_participant.publish_data(msg_bytes, topic="olcrtc_poc") - client_stats["sent"] += 1 - client_stats["bytes_sent"] += len(msg_bytes) - print(f" -> Sent message {i}/{len(test_messages)} ({len(msg_bytes)}b)") + for idx, msg in enumerate(TEST_MESSAGES, 1): + try: + await client.local_participant.publish_data(msg.encode(), topic="olcrtc") + results["sent"] += 1 + print(f" -> Sent: {msg}") await asyncio.sleep(0.5) - - await asyncio.sleep(3) - - results["messages_sent"] = client_stats["sent"] - results["messages_received"] = client_stats["received"] - results["bytes_sent"] = client_stats["bytes_sent"] - results["bytes_received"] = client_stats["bytes_received"] - - print(f" :P Sent: {results['messages_sent']} messages") - print(f" :P Received: {results['messages_received']} responses") - - except Exception as e: - results["errors"].append(f"Exchange failed: {e}") - print(f" X Error: {e}") + except Exception as e: + results["errors"].append(f"Sending {idx} failed: {str(e)}") + + await asyncio.sleep(2) - print("\n[4/4] Cleaning up...") - try: - await server_room.disconnect() - await client_room.disconnect() - print(" :P Cleanup complete") - except: - pass + print("\n[3/3] Cleaning up...") + await server.disconnect() + await client.disconnect() return results -def print_results(results): - print(r""" - TEST RESULTS -""") - - print("Connection Status:") - print(f" - Server: {':P Connected' if results.get('server_connected') else 'X Failed'}") - print(f" - Client: {':P Connected' if results.get('client_connected') else 'X Failed'}") - - print("\nMessage Exchange:") - print(f" - Sent: {results.get('messages_sent', 0)} messages ({results.get('bytes_sent', 0)} bytes)") - print(f" - Received: {results.get('messages_received', 0)} messages ({results.get('bytes_received', 0)} bytes)") - - sent = results.get('messages_sent', 0) - success_rate = (results.get('messages_received', 0) / sent * 100) if sent > 0 else 0 - print(f" - Success Rate: {success_rate:.1f}%") - - if results.get('errors'): - print("\nErrors:") - for err in results['errors']: - print(f" - {err}") - - if results.get('messages_received', 0) > 0: - print("\n :P TEST PASSED - WB Stream PoC works!") - else: - print("\n X TEST FAILED - Check errors above") - -async def main(): - results = await run_full_test() - print_results(results) +def print_results(res: dict): + print("\n--- TEST RESULTS ---") + print(f"Server: {':P' if res['server_ok'] else 'X'} / Client: {':P' if res['client_ok'] else 'X'}") + print(f"Messages: Sent {res['sent']} / Recv {res['recv']}") + if res['errors']: + for e in res['errors']: print(f" Error: {e}") + print(f"\n{':P SUCCESS' if res['sent'] and res['sent'] == res['recv'] else 'X FAILED'}\n") if __name__ == "__main__": try: - asyncio.run(main()) + res = asyncio.run(run_poc()) + print_results(res) except KeyboardInterrupt: - print("\n\nTest interrupted.") + pass