mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-05-26 15:13:40 +00:00
513 lines
20 KiB
Python
Executable File
513 lines
20 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""PoC: Yandex Telemost VideoChannel via aiortc (WB-style flow)."""
|
|
|
|
import asyncio
|
|
import base64
|
|
import hashlib
|
|
import json
|
|
import uuid
|
|
import zlib
|
|
from fractions import Fraction
|
|
from urllib.parse import quote
|
|
|
|
import cv2
|
|
import numpy as np
|
|
import qrcode
|
|
import requests
|
|
import websockets
|
|
from aiortc import (
|
|
RTCConfiguration,
|
|
RTCIceCandidate,
|
|
RTCIceServer,
|
|
RTCPeerConnection,
|
|
RTCSessionDescription,
|
|
)
|
|
from aiortc.mediastreams import MediaStreamTrack
|
|
from av import VideoFrame
|
|
from pyzbar.pyzbar import decode as qr_decode
|
|
|
|
CONFERENCE_ID = "75047680642749"
|
|
CONFERENCE_URL = f"https://telemost.yandex.ru/j/{CONFERENCE_ID}"
|
|
API_BASE = "https://cloud-api.yandex.ru/telemost_front/v2/telemost"
|
|
FPS = 10
|
|
TEST_MESSAGES = [
|
|
"Hello Telemost via Video!",
|
|
"Packed JSON payload test.",
|
|
"X" * 200,
|
|
"Final VideoChannel test",
|
|
]
|
|
|
|
|
|
def _uid() -> str:
|
|
return str(uuid.uuid4())
|
|
|
|
|
|
def _encode(text: str) -> np.ndarray:
|
|
payload = base64.b64encode(zlib.compress(text.encode())).decode()
|
|
qr = qrcode.QRCode(error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=8, border=3)
|
|
qr.add_data(payload)
|
|
qr.make(fit=True)
|
|
arr = np.array(qr.make_image(fill_color="black", back_color="white").convert("RGB"), dtype=np.uint8)
|
|
h, w = arr.shape[:2]
|
|
return arr if (h % 2 == 0 and w % 2 == 0) else cv2.resize(arr, (w + w % 2, h + h % 2))
|
|
|
|
|
|
def _decode(frame: VideoFrame) -> str | None:
|
|
arr = frame.to_ndarray(format="rgb24")
|
|
gray = cv2.cvtColor(arr, cv2.COLOR_RGB2GRAY)
|
|
for img in [gray, cv2.resize(gray, (gray.shape[1] * 2, gray.shape[0] * 2), interpolation=cv2.INTER_CUBIC)]:
|
|
for obj in qr_decode(img):
|
|
try:
|
|
return zlib.decompress(base64.b64decode(obj.data)).decode()
|
|
except Exception:
|
|
pass
|
|
return None
|
|
|
|
|
|
def _get_conn_info(display_name: str) -> dict:
|
|
url = f"{API_BASE}/conferences/{quote(CONFERENCE_URL, safe='')}/connection"
|
|
params = {
|
|
"next_gen_media_platform_allowed": "true",
|
|
"display_name": display_name,
|
|
"waiting_room_supported": "true",
|
|
}
|
|
headers = {
|
|
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0",
|
|
"Accept": "*/*",
|
|
"content-type": "application/json",
|
|
"Client-Instance-Id": _uid(),
|
|
"X-Telemost-Client-Version": "187.1.0",
|
|
"idempotency-key": _uid(),
|
|
"Origin": "https://telemost.yandex.ru",
|
|
"Referer": "https://telemost.yandex.ru/",
|
|
}
|
|
resp = requests.get(url, params=params, headers=headers)
|
|
resp.raise_for_status()
|
|
return resp.json()
|
|
|
|
|
|
def _make_ice_servers(raw_list: list[dict]) -> list[RTCIceServer]:
|
|
result = []
|
|
for server in raw_list:
|
|
urls = server.get("urls", [])
|
|
cred = server.get("credential", "")
|
|
user = server.get("username", "")
|
|
if cred:
|
|
result.append(RTCIceServer(urls=urls, credential=cred, username=user))
|
|
else:
|
|
result.append(RTCIceServer(urls=urls))
|
|
return result or [RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"])]
|
|
|
|
|
|
class _VideoChannelTrack(MediaStreamTrack):
|
|
kind = "video"
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
self._frame = _encode("IDLE")
|
|
self._pts = 0
|
|
|
|
def set_frame(self, arr: np.ndarray) -> None:
|
|
self._frame = arr
|
|
|
|
async def recv(self) -> VideoFrame:
|
|
await asyncio.sleep(1.0 / FPS)
|
|
frame = VideoFrame.from_ndarray(self._frame, format="rgb24")
|
|
frame.pts = self._pts
|
|
frame.time_base = Fraction(1, FPS)
|
|
self._pts += 1
|
|
return frame
|
|
|
|
|
|
async def _connect_peer(name: str, conn: dict, is_sender: bool, on_video_message=None) -> dict:
|
|
default_ice = [RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"])]
|
|
track = _VideoChannelTrack() if is_sender else None
|
|
|
|
pc_sub_ref = [RTCPeerConnection(RTCConfiguration(iceServers=default_ice))]
|
|
pc_pub_ref = [RTCPeerConnection(RTCConfiguration(iceServers=default_ice))]
|
|
if is_sender:
|
|
pc_pub_ref[0].addTrack(track)
|
|
|
|
ws = await websockets.connect(
|
|
conn["client_configuration"]["media_server_url"],
|
|
additional_headers={
|
|
"Origin": "https://telemost.yandex.ru",
|
|
"User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0",
|
|
},
|
|
ping_interval=None,
|
|
)
|
|
|
|
subscriber_connected = asyncio.Event()
|
|
publisher_connected = asyncio.Event()
|
|
|
|
async def _send(obj: dict) -> None:
|
|
await ws.send(json.dumps(obj))
|
|
|
|
async def _ack(uid: str) -> None:
|
|
await _send({"uid": uid, "ack": {"status": {"code": "OK", "description": ""}}})
|
|
|
|
def _setup(pc_sub, pc_pub) -> None:
|
|
@pc_sub.on("track")
|
|
def on_track(remote_track):
|
|
if remote_track.kind != "video" or on_video_message is None:
|
|
return
|
|
|
|
async def _loop():
|
|
last_hash = None
|
|
while True:
|
|
try:
|
|
frame = await asyncio.wait_for(remote_track.recv(), timeout=30.0)
|
|
frame_hash = hashlib.md5(frame.to_ndarray(format="rgb24").tobytes()).hexdigest()
|
|
if frame_hash == last_hash:
|
|
continue
|
|
last_hash = frame_hash
|
|
msg = _decode(frame)
|
|
if msg:
|
|
on_video_message(msg)
|
|
except Exception:
|
|
return
|
|
|
|
asyncio.create_task(_loop())
|
|
|
|
@pc_sub.on("connectionstatechange")
|
|
async def on_sub_state():
|
|
if pc_sub.connectionState == "connected":
|
|
subscriber_connected.set()
|
|
|
|
@pc_pub.on("connectionstatechange")
|
|
async def on_pub_state():
|
|
if pc_pub.connectionState == "connected":
|
|
publisher_connected.set()
|
|
|
|
@pc_sub.on("icecandidate")
|
|
async def on_sub_ice(event):
|
|
if event.candidate:
|
|
await _send({
|
|
"uid": _uid(),
|
|
"webrtcIceCandidate": {
|
|
"candidate": event.candidate.candidate,
|
|
"sdpMid": event.candidate.sdpMid,
|
|
"sdpMlineIndex": event.candidate.sdpMLineIndex,
|
|
"usernameFragment": "",
|
|
"target": "SUBSCRIBER",
|
|
"pcSeq": 1,
|
|
},
|
|
})
|
|
|
|
@pc_pub.on("icecandidate")
|
|
async def on_pub_ice(event):
|
|
if event.candidate:
|
|
await _send({
|
|
"uid": _uid(),
|
|
"webrtcIceCandidate": {
|
|
"candidate": event.candidate.candidate,
|
|
"sdpMid": event.candidate.sdpMid,
|
|
"sdpMlineIndex": event.candidate.sdpMLineIndex,
|
|
"usernameFragment": "",
|
|
"target": "PUBLISHER",
|
|
"pcSeq": 1,
|
|
},
|
|
})
|
|
|
|
_setup(pc_sub_ref[0], pc_pub_ref[0])
|
|
|
|
await _send({
|
|
"uid": _uid(),
|
|
"hello": {
|
|
"participantMeta": {
|
|
"name": name,
|
|
"role": "SPEAKER",
|
|
"description": "",
|
|
"sendAudio": False,
|
|
"sendVideo": is_sender,
|
|
},
|
|
"participantAttributes": {
|
|
"name": name,
|
|
"role": "SPEAKER",
|
|
"description": "",
|
|
},
|
|
"sendAudio": False,
|
|
"sendVideo": is_sender,
|
|
"sendSharing": False,
|
|
"participantId": conn["peer_id"],
|
|
"roomId": conn["room_id"],
|
|
"serviceName": "telemost",
|
|
"credentials": conn["credentials"],
|
|
"capabilitiesOffer": {
|
|
"offerAnswerMode": ["SEPARATE"],
|
|
"initialSubscriberOffer": ["ON_HELLO"],
|
|
"slotsMode": ["FROM_CONTROLLER"],
|
|
"simulcastMode": ["DISABLED", "STATIC"],
|
|
"selfVadStatus": ["FROM_SERVER", "FROM_CLIENT"],
|
|
"dataChannelSharing": ["TO_RTP"],
|
|
"videoEncoderConfig": ["NO_CONFIG", "ONLY_INIT_CONFIG", "RUNTIME_CONFIG"],
|
|
"dataChannelVideoCodec": ["VP8", "UNIQUE_CODEC_FROM_TRACK_DESCRIPTION"],
|
|
"bandwidthLimitationReason": ["BANDWIDTH_REASON_DISABLED", "BANDWIDTH_REASON_ENABLED"],
|
|
"sdkDefaultDeviceManagement": ["SDK_DEFAULT_DEVICE_MANAGEMENT_DISABLED", "SDK_DEFAULT_DEVICE_MANAGEMENT_ENABLED"],
|
|
"joinOrderLayout": ["JOIN_ORDER_LAYOUT_DISABLED", "JOIN_ORDER_LAYOUT_ENABLED"],
|
|
"pinLayout": ["PIN_LAYOUT_DISABLED"],
|
|
"sendSelfViewVideoSlot": ["SEND_SELF_VIEW_VIDEO_SLOT_DISABLED", "SEND_SELF_VIEW_VIDEO_SLOT_ENABLED"],
|
|
"serverLayoutTransition": ["SERVER_LAYOUT_TRANSITION_DISABLED"],
|
|
"sdkPublisherOptimizeBitrate": [
|
|
"SDK_PUBLISHER_OPTIMIZE_BITRATE_DISABLED",
|
|
"SDK_PUBLISHER_OPTIMIZE_BITRATE_FULL",
|
|
"SDK_PUBLISHER_OPTIMIZE_BITRATE_ONLY_SELF",
|
|
],
|
|
"sdkNetworkLostDetection": ["SDK_NETWORK_LOST_DETECTION_DISABLED"],
|
|
"sdkNetworkPathMonitor": ["SDK_NETWORK_PATH_MONITOR_DISABLED"],
|
|
"publisherVp9": ["PUBLISH_VP9_DISABLED", "PUBLISH_VP9_ENABLED"],
|
|
"svcMode": ["SVC_MODE_DISABLED", "SVC_MODE_L3T3", "SVC_MODE_L3T3_KEY"],
|
|
"subscriberOfferAsyncAck": ["SUBSCRIBER_OFFER_ASYNC_ACK_DISABLED", "SUBSCRIBER_OFFER_ASYNC_ACK_ENABLED"],
|
|
"androidBluetoothRoutingFix": ["ANDROID_BLUETOOTH_ROUTING_FIX_DISABLED"],
|
|
"fixedIceCandidatesPoolSize": ["FIXED_ICE_CANDIDATES_POOL_SIZE_DISABLED"],
|
|
"sdkAndroidTelecomIntegration": ["SDK_ANDROID_TELECOM_INTEGRATION_DISABLED"],
|
|
"setActiveCodecsMode": ["SET_ACTIVE_CODECS_MODE_DISABLED", "SET_ACTIVE_CODECS_MODE_VIDEO_ONLY"],
|
|
"subscriberDtlsPassiveMode": ["SUBSCRIBER_DTLS_PASSIVE_MODE_DISABLED"],
|
|
"publisherOpusDred": ["PUBLISHER_OPUS_DRED_DISABLED"],
|
|
"publisherOpusLowBitrate": ["PUBLISHER_OPUS_LOW_BITRATE_DISABLED"],
|
|
"sdkAndroidDestroySessionOnTaskRemoved": ["SDK_ANDROID_DESTROY_SESSION_ON_TASK_REMOVED_DISABLED"],
|
|
"svcModes": ["FALSE"],
|
|
"reportTelemetryModes": ["TRUE"],
|
|
"keepDefaultDevicesModes": ["FALSE"],
|
|
},
|
|
"sdkInfo": {
|
|
"implementation": "browser",
|
|
"version": "5.27.0",
|
|
"userAgent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0",
|
|
"hwConcurrency": 24,
|
|
},
|
|
"sdkInitializationId": _uid(),
|
|
"disablePublisher": not is_sender,
|
|
"disableSubscriber": False,
|
|
"disableSubscriberAudio": True,
|
|
},
|
|
})
|
|
|
|
async def _ws_loop():
|
|
pub_sdp_sent = False
|
|
try:
|
|
async for raw in ws:
|
|
data = json.loads(raw)
|
|
uid = data.get("uid", "")
|
|
|
|
if "ack" in data:
|
|
continue
|
|
|
|
if "serverHello" in data:
|
|
raw_ice = data["serverHello"].get("rtcConfiguration", {}).get("iceServers", [])
|
|
if raw_ice:
|
|
ice = _make_ice_servers(raw_ice)
|
|
old_sub = pc_sub_ref[0]
|
|
old_pub = pc_pub_ref[0]
|
|
pc_sub_ref[0] = RTCPeerConnection(RTCConfiguration(iceServers=ice))
|
|
pc_pub_ref[0] = RTCPeerConnection(RTCConfiguration(iceServers=ice))
|
|
if is_sender and track is not None:
|
|
pc_pub_ref[0].addTrack(track)
|
|
_setup(pc_sub_ref[0], pc_pub_ref[0])
|
|
await old_sub.close()
|
|
await old_pub.close()
|
|
await _ack(uid)
|
|
continue
|
|
|
|
if "subscriberSdpOffer" in data:
|
|
sdp = data["subscriberSdpOffer"]
|
|
await pc_sub_ref[0].setRemoteDescription(
|
|
RTCSessionDescription(sdp=sdp["sdp"], type="offer")
|
|
)
|
|
answer = await pc_sub_ref[0].createAnswer()
|
|
await pc_sub_ref[0].setLocalDescription(answer)
|
|
await _send({
|
|
"uid": _uid(),
|
|
"subscriberSdpAnswer": {
|
|
"pcSeq": sdp["pcSeq"],
|
|
"sdp": pc_sub_ref[0].localDescription.sdp,
|
|
},
|
|
})
|
|
await _ack(uid)
|
|
|
|
if not is_sender:
|
|
await _send({
|
|
"uid": _uid(),
|
|
"setSlots": {
|
|
"slots": [{"width": 1280, "height": 720}, {"width": 640, "height": 360}],
|
|
"audioSlotsCount": 0,
|
|
"key": 1,
|
|
"shutdownAllVideo": None,
|
|
"withSelfView": False,
|
|
"selfViewVisibility": "ON_LOADING_THEN_SHOW",
|
|
"gridConfig": {},
|
|
},
|
|
})
|
|
|
|
if is_sender and not pub_sdp_sent:
|
|
await asyncio.sleep(0.3)
|
|
offer = await pc_pub_ref[0].createOffer()
|
|
await pc_pub_ref[0].setLocalDescription(offer)
|
|
tracks = [
|
|
{
|
|
"mid": t.mid,
|
|
"transceiverMid": t.mid,
|
|
"kind": t.sender.track.kind.upper(),
|
|
"priority": 0,
|
|
"label": "videochannel",
|
|
"codecs": {},
|
|
"groupId": 1,
|
|
"description": "",
|
|
}
|
|
for t in pc_pub_ref[0].getTransceivers()
|
|
if t.sender.track
|
|
]
|
|
await _send({
|
|
"uid": _uid(),
|
|
"publisherSdpOffer": {
|
|
"pcSeq": 1,
|
|
"sdp": pc_pub_ref[0].localDescription.sdp,
|
|
"tracks": tracks,
|
|
},
|
|
})
|
|
pub_sdp_sent = True
|
|
continue
|
|
|
|
if "publisherSdpAnswer" in data:
|
|
await pc_pub_ref[0].setRemoteDescription(
|
|
RTCSessionDescription(sdp=data["publisherSdpAnswer"]["sdp"], type="answer")
|
|
)
|
|
await _ack(uid)
|
|
continue
|
|
|
|
if "webrtcIceCandidate" in data:
|
|
cand = data["webrtcIceCandidate"]
|
|
candidate_str = cand.get("candidate", "")
|
|
parts = candidate_str.split()
|
|
if len(parts) < 8:
|
|
continue
|
|
try:
|
|
tcp_type = parts[parts.index("tcptype") + 1] if "tcptype" in parts else None
|
|
ice = RTCIceCandidate(
|
|
component=int(parts[1]),
|
|
foundation=parts[0].replace("candidate:", ""),
|
|
ip=parts[4],
|
|
port=int(parts[5]),
|
|
priority=int(parts[3]),
|
|
protocol=parts[2].lower(),
|
|
type=parts[7],
|
|
tcpType=tcp_type,
|
|
sdpMid=cand.get("sdpMid", "0"),
|
|
sdpMLineIndex=cand.get("sdpMlineIndex", 0),
|
|
)
|
|
if cand.get("target") == "SUBSCRIBER":
|
|
await pc_sub_ref[0].addIceCandidate(ice)
|
|
else:
|
|
await pc_pub_ref[0].addIceCandidate(ice)
|
|
except Exception:
|
|
pass
|
|
continue
|
|
|
|
if uid:
|
|
await _ack(uid)
|
|
except Exception:
|
|
pass
|
|
|
|
return {
|
|
"track": track,
|
|
"subscriber_connected": subscriber_connected,
|
|
"publisher_connected": publisher_connected,
|
|
"ws_task": asyncio.create_task(_ws_loop()),
|
|
"ws": ws,
|
|
"pc_sub_ref": pc_sub_ref,
|
|
"pc_pub_ref": pc_pub_ref,
|
|
}
|
|
|
|
|
|
async def run_poc() -> dict:
|
|
print("\n--- Yandex Telemost VideoChannel PoC ---")
|
|
results = {"server_ok": False, "client_ok": False, "sent": 0, "recv": 0, "errors": []}
|
|
recv_events = [asyncio.Event() for _ in TEST_MESSAGES]
|
|
last_message = [None]
|
|
|
|
print("[1/3] Connecting peers...")
|
|
try:
|
|
sender_conn = _get_conn_info("OlcRTC-Client")
|
|
receiver_conn = _get_conn_info("OlcRTC-Server")
|
|
|
|
def on_msg(msg: str) -> None:
|
|
last_message[0] = msg
|
|
for i, expected in enumerate(TEST_MESSAGES):
|
|
if msg == expected and not recv_events[i].is_set():
|
|
results["recv"] += 1
|
|
print(f" -> Recv: {msg[:60]}")
|
|
recv_events[i].set()
|
|
break
|
|
|
|
server = await _connect_peer("OlcRTC-Server", receiver_conn, is_sender=False, on_video_message=on_msg)
|
|
client = await _connect_peer("OlcRTC-Client", sender_conn, is_sender=True)
|
|
await asyncio.wait_for(server["subscriber_connected"].wait(), timeout=20.0)
|
|
await asyncio.wait_for(client["publisher_connected"].wait(), timeout=20.0)
|
|
results["server_ok"] = True
|
|
results["client_ok"] = True
|
|
print(" :P Peers connected")
|
|
except Exception as err:
|
|
results["errors"].append(str(err))
|
|
return results
|
|
|
|
print("\n[2/3] Publishing VideoChannel...")
|
|
current_frame = [_encode("IDLE")]
|
|
client["track"].set_frame(current_frame[0])
|
|
|
|
async def _push_frames():
|
|
while True:
|
|
client["track"].set_frame(current_frame[0])
|
|
await asyncio.sleep(1.0 / FPS)
|
|
|
|
push_task = asyncio.create_task(_push_frames())
|
|
|
|
print("\n[3/3] Sending messages...")
|
|
await asyncio.sleep(2)
|
|
|
|
for idx, msg in enumerate(TEST_MESSAGES):
|
|
try:
|
|
current_frame[0] = _encode(msg)
|
|
results["sent"] += 1
|
|
print(f" -> Sent: {msg[:60]}")
|
|
try:
|
|
await asyncio.wait_for(recv_events[idx].wait(), timeout=10.0)
|
|
except asyncio.TimeoutError:
|
|
if last_message[0] is not None:
|
|
results["errors"].append(
|
|
f"Timeout waiting for msg {idx + 1}, last recv={last_message[0][:30]!r}"
|
|
)
|
|
else:
|
|
results["errors"].append(f"Timeout waiting for msg {idx + 1}")
|
|
except Exception as err:
|
|
results["errors"].append(f"Send {idx + 1} failed: {err}")
|
|
|
|
push_task.cancel()
|
|
for peer in (server, client):
|
|
peer["ws_task"].cancel()
|
|
try:
|
|
await peer["ws"].close()
|
|
except Exception:
|
|
pass
|
|
try:
|
|
await peer["pc_sub_ref"][0].close()
|
|
await peer["pc_pub_ref"][0].close()
|
|
except Exception:
|
|
pass
|
|
return results
|
|
|
|
|
|
def print_results(res: dict):
|
|
print("\n--- TEST RESULTS ---")
|
|
print(f"Server: {':P' if res['server_ok'] else 'X'} / Client: {':P' if res['client_ok'] else 'X'}")
|
|
print(f"Messages: Sent {res['sent']} / Recv {res['recv']}")
|
|
for err in res.get("errors", []):
|
|
print(f" Error: {err}")
|
|
print(f"\n{':P SUCCESS' if res['sent'] and res['sent'] == res['recv'] else 'X FAILED'}\n")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
try:
|
|
print_results(asyncio.run(run_poc()))
|
|
except KeyboardInterrupt:
|
|
pass
|