From d9606d6f7564a7e88f963f186d92167559fff802 Mon Sep 17 00:00:00 2001 From: zowue Date: Sun, 5 Apr 2026 14:02:31 +0300 Subject: [PATCH] feat(vcsend): Add WebRTC video QR code transfer utility --- code/vcsend.py | 625 +++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 625 insertions(+) create mode 100755 code/vcsend.py diff --git a/code/vcsend.py b/code/vcsend.py new file mode 100755 index 0000000..5cdfdbd --- /dev/null +++ b/code/vcsend.py @@ -0,0 +1,625 @@ +#!/usr/bin/env python3 + +import asyncio +import json +import uuid +import websockets +import requests +import qrcode +import cv2 +import numpy as np +from urllib.parse import quote +from aiortc import RTCPeerConnection, RTCSessionDescription, RTCIceCandidate, RTCConfiguration, RTCIceServer +from aiortc.mediastreams import MediaStreamTrack +from av import VideoFrame +import base64 +from PIL import Image +from pyzbar import pyzbar +from fractions import Fraction + +CONFERENCE_ID = "75047680642749" +CONFERENCE_URL = f"https://telemost.yandex.ru/j/{CONFERENCE_ID}" +API_BASE = "https://cloud-api.yandex.ru/telemost_front/v2/telemost" + +QR_SIZE = 600 +CHUNK_SIZE = 400 +FRAME_RATE = 1 + + +def gen_uid(): + return str(uuid.uuid4()) + + +def get_connection_info(display_name): + url = f"{API_BASE}/conferences/{quote(CONFERENCE_URL, safe='')}/connection" + params = { + "next_gen_media_platform_allowed": "true", + "display_name": display_name, + "waiting_room_supported": "true" + } + headers = { + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0", + "Accept": "*/*", + "content-type": "application/json", + "Client-Instance-Id": gen_uid(), + "X-Telemost-Client-Version": "187.1.0", + "idempotency-key": gen_uid(), + "Origin": "https://telemost.yandex.ru", + "Referer": "https://telemost.yandex.ru/" + } + r = requests.get(url, params=params, headers=headers) + r.raise_for_status() + return r.json() + + +def make_qr_frame(data, pts): + qr = qrcode.QRCode( + version=None, + error_correction=qrcode.constants.ERROR_CORRECT_M, + box_size=12, + border=4 + ) + qr.add_data(data) + qr.make(fit=True) + img = qr.make_image(fill_color="black", back_color="white").resize( + (QR_SIZE, QR_SIZE), Image.NEAREST + ) + arr = np.array(img.convert('RGB')) + frame = VideoFrame.from_ndarray(arr, format="rgb24") + frame.pts = pts + frame.time_base = Fraction(1, FRAME_RATE) + return frame + + +def chunk_data(data, tid): + b64 = base64.b64encode(data).decode() + n = (len(b64) + CHUNK_SIZE - 1) // CHUNK_SIZE + return [json.dumps({"tid": tid, "idx": i, "total": n, + "data": b64[i * CHUNK_SIZE:(i + 1) * CHUNK_SIZE]}) + for i in range(n)] + + +class QRVideoTrack(MediaStreamTrack): + kind = "video" + + def __init__(self): + super().__init__() + self._frames = [] + self._idx = 0 + self._pts = 0 + + def set_data(self, chunks): + self._frames = [make_qr_frame(c, i) for i, c in enumerate(chunks)] + self._idx = 0 + self._pts = 0 + print(f" -> QRVideoTrack: {len(self._frames)} frames ready") + + async def recv(self): + await asyncio.sleep(1.0 / FRAME_RATE) + if not self._frames: + f = make_qr_frame("WAIT", self._pts) + self._pts += 1 + return f + f = self._frames[self._idx] + f.pts = self._pts + f.time_base = Fraction(1, FRAME_RATE) + self._pts += 1 + self._idx = (self._idx + 1) % len(self._frames) + if self._pts % 10 == 0: + print(f" -> QR sending frame {self._idx}/{len(self._frames)} pts={self._pts}") + return f + + +class QRReceiver: + def __init__(self): + self._bufs = {} + self.result = None + self._frame_count = 0 + self._cv2_detector = cv2.QRCodeDetector() + + def feed_frame(self, frame): + try: + self._frame_count += 1 + arr = frame.to_ndarray(format="rgb24") + h, w = arr.shape[:2] + + if self._frame_count <= 3: + cv2.imwrite(f"/tmp/qr_recv_{self._frame_count}.png", + cv2.cvtColor(arr, cv2.COLOR_RGB2BGR)) + print(f" -> [recv] saved /tmp/qr_recv_{self._frame_count}.png {w}x{h}") + + gray = cv2.cvtColor(arr, cv2.COLOR_RGB2GRAY) + + variants = [gray] + up2 = cv2.resize(gray, (w * 2, h * 2), interpolation=cv2.INTER_CUBIC) + variants.append(up2) + _, thresh = cv2.threshold(gray, 0, 255, cv2.THRESH_BINARY + cv2.THRESH_OTSU) + variants.append(thresh) + variants.append(cv2.resize(thresh, (w * 2, h * 2), interpolation=cv2.INTER_NEAREST)) + + decoded = set() + for v in variants: + try: + for code in pyzbar.decode(v): + decoded.add(code.data.decode('utf-8')) + except Exception: + pass + try: + val, _, _ = self._cv2_detector.detectAndDecode(v) + if val: + decoded.add(val) + except Exception: + pass + + for raw in decoded: + try: + pkt = json.loads(raw) + tid = pkt["tid"] + idx = pkt["idx"] + total = pkt["total"] + data = pkt["data"] + if tid not in self._bufs: + self._bufs[tid] = {} + if idx not in self._bufs[tid]: + self._bufs[tid][idx] = data + print(f" -> QR chunk {idx+1}/{total} tid={tid[:8]}") + if len(self._bufs[tid]) == total: + b64 = "".join(self._bufs[tid][i] for i in range(total)) + self.result = base64.b64decode(b64) + print(f" -> QR COMPLETE: {len(self.result)} bytes") + return True + except Exception: + pass + + if self._frame_count % 30 == 0: + print(f" -> [recv] {self._frame_count} frames, no QR, size={w}x{h}") + + except Exception as e: + print(f" -> [recv] feed_frame err: {e}") + return False + + +async def process_track(track, receiver, name): + print(f" -> [{name}] video processor started") + count = 0 + while True: + try: + frame = await asyncio.wait_for(track.recv(), timeout=30.0) + count += 1 + if count <= 5 or count % 50 == 0: + print(f" -> [{name}] frame #{count} {frame.width}x{frame.height}") + if receiver.feed_frame(frame): + return + except asyncio.TimeoutError: + print(f" -> [{name}] track frozen after {count} frames") + return + except Exception as e: + print(f" -> [{name}] track err: {e}") + return + + +def make_ice_servers(raw_list): + result = [] + for s in raw_list: + urls = s.get("urls", []) + cred = s.get("credential", "") + user = s.get("username", "") + if cred: + result.append(RTCIceServer(urls=urls, credential=cred, username=user)) + else: + result.append(RTCIceServer(urls=urls)) + return result or [RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"])] + + +async def connect_peer(name, conn): + room_id = conn["room_id"] + peer_id = conn["peer_id"] + credentials = conn["credentials"] + ws_url = conn["client_configuration"]["media_server_url"] + is_sender = "Sender" in name + + print(f"\n -> [{name}] room={room_id} peer={peer_id[:8]} sender={is_sender}") + + default_ice = [RTCIceServer(urls=["stun:stun.rtc.yandex.net:3478"])] + + video_track = QRVideoTrack() if is_sender else None + receiver_obj = QRReceiver() if not is_sender else None + track_tasks = [] + + # используем списки чтобы можно было переприсваивать в замыканиях + pc_sub_ref = [RTCPeerConnection(RTCConfiguration(iceServers=default_ice))] + pc_pub_ref = [RTCPeerConnection(RTCConfiguration(iceServers=default_ice))] + + if is_sender: + pc_pub_ref[0].addTrack(video_track) + + ws = await websockets.connect( + ws_url, + additional_headers={ + "Origin": "https://telemost.yandex.ru", + "User-Agent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0" + } + ) + print(f" -> [{name}] WS connected") + + async def send(obj): + await ws.send(json.dumps(obj)) + + async def ack(uid): + await send({"uid": uid, "ack": {"status": {"code": "OK", "description": ""}}}) + + def setup_pc(pc_sub, pc_pub): + @pc_sub.on("track") + def on_track(track): + print(f" -> [{name}] GOT TRACK kind={track.kind}") + if track.kind == "video" and receiver_obj is not None: + t = asyncio.ensure_future(process_track(track, receiver_obj, name)) + track_tasks.append(t) + + @pc_sub.on("connectionstatechange") + async def _s(): + print(f" -> [{name}] sub={pc_sub.connectionState}") + + @pc_pub.on("connectionstatechange") + async def _p(): + print(f" -> [{name}] pub={pc_pub.connectionState}") + + @pc_sub.on("iceconnectionstatechange") + async def _si(): + print(f" -> [{name}] sub ICE={pc_sub.iceConnectionState}") + + @pc_pub.on("iceconnectionstatechange") + async def _pi(): + print(f" -> [{name}] pub ICE={pc_pub.iceConnectionState}") + + @pc_sub.on("icecandidate") + async def on_sub_ice(e): + if e.candidate: + await send({ + "uid": gen_uid(), + "webrtcIceCandidate": { + "candidate": e.candidate.candidate, + "sdpMid": e.candidate.sdpMid, + "sdpMlineIndex": e.candidate.sdpMLineIndex, + "usernameFragment": "", + "target": "SUBSCRIBER", + "pcSeq": 1 + } + }) + print(f" -> [{name}] >> sub ICE sent") + + @pc_pub.on("icecandidate") + async def on_pub_ice(e): + if e.candidate: + await send({ + "uid": gen_uid(), + "webrtcIceCandidate": { + "candidate": e.candidate.candidate, + "sdpMid": e.candidate.sdpMid, + "sdpMlineIndex": e.candidate.sdpMLineIndex, + "usernameFragment": "", + "target": "PUBLISHER", + "pcSeq": 1 + } + }) + print(f" -> [{name}] >> pub ICE sent") + + setup_pc(pc_sub_ref[0], pc_pub_ref[0]) + + hello = { + "uid": gen_uid(), + "hello": { + "participantMeta": { + "name": name, "role": "SPEAKER", "description": "", + "sendAudio": False, "sendVideo": is_sender + }, + "participantAttributes": {"name": name, "role": "SPEAKER", "description": ""}, + "sendAudio": False, + "sendVideo": is_sender, + "sendSharing": False, + "participantId": peer_id, + "roomId": room_id, + "serviceName": "telemost", + "credentials": credentials, + "capabilitiesOffer": { + "offerAnswerMode": ["SEPARATE"], + "initialSubscriberOffer": ["ON_HELLO"], + "slotsMode": ["FROM_CONTROLLER"], + "simulcastMode": ["DISABLED", "STATIC"], + "selfVadStatus": ["FROM_SERVER", "FROM_CLIENT"], + "dataChannelSharing": ["TO_RTP"], + "videoEncoderConfig": ["NO_CONFIG", "ONLY_INIT_CONFIG", "RUNTIME_CONFIG"], + "dataChannelVideoCodec": ["VP8", "UNIQUE_CODEC_FROM_TRACK_DESCRIPTION"], + "bandwidthLimitationReason": ["BANDWIDTH_REASON_DISABLED", "BANDWIDTH_REASON_ENABLED"], + "sdkDefaultDeviceManagement": ["SDK_DEFAULT_DEVICE_MANAGEMENT_DISABLED", "SDK_DEFAULT_DEVICE_MANAGEMENT_ENABLED"], + "joinOrderLayout": ["JOIN_ORDER_LAYOUT_DISABLED", "JOIN_ORDER_LAYOUT_ENABLED"], + "pinLayout": ["PIN_LAYOUT_DISABLED"], + "sendSelfViewVideoSlot": ["SEND_SELF_VIEW_VIDEO_SLOT_DISABLED", "SEND_SELF_VIEW_VIDEO_SLOT_ENABLED"], + "serverLayoutTransition": ["SERVER_LAYOUT_TRANSITION_DISABLED"], + "sdkPublisherOptimizeBitrate": ["SDK_PUBLISHER_OPTIMIZE_BITRATE_DISABLED", "SDK_PUBLISHER_OPTIMIZE_BITRATE_FULL", "SDK_PUBLISHER_OPTIMIZE_BITRATE_ONLY_SELF"], + "sdkNetworkLostDetection": ["SDK_NETWORK_LOST_DETECTION_DISABLED"], + "sdkNetworkPathMonitor": ["SDK_NETWORK_PATH_MONITOR_DISABLED"], + "publisherVp9": ["PUBLISH_VP9_DISABLED", "PUBLISH_VP9_ENABLED"], + "svcMode": ["SVC_MODE_DISABLED", "SVC_MODE_L3T3", "SVC_MODE_L3T3_KEY"], + "subscriberOfferAsyncAck": ["SUBSCRIBER_OFFER_ASYNC_ACK_DISABLED", "SUBSCRIBER_OFFER_ASYNC_ACK_ENABLED"], + "androidBluetoothRoutingFix": ["ANDROID_BLUETOOTH_ROUTING_FIX_DISABLED"], + "fixedIceCandidatesPoolSize": ["FIXED_ICE_CANDIDATES_POOL_SIZE_DISABLED"], + "sdkAndroidTelecomIntegration": ["SDK_ANDROID_TELECOM_INTEGRATION_DISABLED"], + "setActiveCodecsMode": ["SET_ACTIVE_CODECS_MODE_DISABLED", "SET_ACTIVE_CODECS_MODE_VIDEO_ONLY"], + "subscriberDtlsPassiveMode": ["SUBSCRIBER_DTLS_PASSIVE_MODE_DISABLED"], + "publisherOpusDred": ["PUBLISHER_OPUS_DRED_DISABLED"], + "publisherOpusLowBitrate": ["PUBLISHER_OPUS_LOW_BITRATE_DISABLED"], + "sdkAndroidDestroySessionOnTaskRemoved": ["SDK_ANDROID_DESTROY_SESSION_ON_TASK_REMOVED_DISABLED"], + "svcModes": ["FALSE"], + "reportTelemetryModes": ["TRUE"], + "keepDefaultDevicesModes": ["FALSE"] + }, + "sdkInfo": { + "implementation": "browser", + "version": "5.27.0", + "userAgent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0", + "hwConcurrency": 24 + }, + "sdkInitializationId": gen_uid(), + "disablePublisher": not is_sender, + "disableSubscriber": False, + "disableSubscriberAudio": True + } + } + + await send(hello) + print(f" -> [{name}] hello sent") + + pub_sdp_sent = False + + async def ws_loop(): + nonlocal pub_sdp_sent + + try: + async for raw in ws: + msg = json.loads(raw) + keys = [k for k in msg if k != "uid"] + if not keys: + continue + mtype = keys[0] + uid = msg.get("uid", "") + print(f" -> [{name}] << {mtype}") + + if mtype == "ack": + pass + + elif mtype == "serverHello": + sh = msg["serverHello"] + raw_ice = sh.get("rtcConfiguration", {}).get("iceServers", []) + if raw_ice: + ice = make_ice_servers(raw_ice) + old_sub = pc_sub_ref[0] + old_pub = pc_pub_ref[0] + pc_sub_ref[0] = RTCPeerConnection(RTCConfiguration(iceServers=ice)) + pc_pub_ref[0] = RTCPeerConnection(RTCConfiguration(iceServers=ice)) + if is_sender and video_track: + pc_pub_ref[0].addTrack(video_track) + setup_pc(pc_sub_ref[0], pc_pub_ref[0]) + await old_sub.close() + await old_pub.close() + print(f" -> [{name}] PC recreated with TURN") + await ack(uid) + + elif mtype == "subscriberSdpOffer": + offer_sdp = msg["subscriberSdpOffer"]["sdp"] + pc_seq = msg["subscriberSdpOffer"]["pcSeq"] + pc_sub = pc_sub_ref[0] + pc_pub = pc_pub_ref[0] + + await pc_sub.setRemoteDescription( + RTCSessionDescription(sdp=offer_sdp, type="offer") + ) + answer = await pc_sub.createAnswer() + await pc_sub.setLocalDescription(answer) + + await send({ + "uid": gen_uid(), + "subscriberSdpAnswer": { + "pcSeq": pc_seq, + "sdp": pc_sub.localDescription.sdp + } + }) + print(f" -> [{name}] >> subscriberSdpAnswer") + await ack(uid) + + if not is_sender: + await send({ + "uid": gen_uid(), + "setSlots": { + "slots": [ + {"width": 1280, "height": 720}, + {"width": 640, "height": 360} + ], + "audioSlotsCount": 0, + "key": 1, + "shutdownAllVideo": None, + "withSelfView": False, + "selfViewVisibility": "ON_LOADING_THEN_SHOW", + "gridConfig": {} + } + }) + print(f" -> [{name}] >> setSlots (запросили маршрутизацию видео у сервера!)") + + if is_sender and not pub_sdp_sent: + await asyncio.sleep(0.3) + pub_offer = await pc_pub.createOffer() + await pc_pub.setLocalDescription(pub_offer) + tracks_info = [] + for t in pc_pub.getTransceivers(): + if t.sender.track: + tracks_info.append({ + "mid": t.mid, + "transceiverMid": t.mid, + "kind": t.sender.track.kind.upper(), + "priority": 0, + "label": "QRVideoTrack", + "codecs": {}, + "groupId": 1, + "description": "" + }) + await send({ + "uid": gen_uid(), + "publisherSdpOffer": { + "pcSeq": 1, + "sdp": pc_pub.localDescription.sdp, + "tracks": tracks_info + } + }) + pub_sdp_sent = True + print(f" -> [{name}] >> publisherSdpOffer") + + elif mtype == "publisherSdpAnswer": + await pc_pub_ref[0].setRemoteDescription( + RTCSessionDescription(sdp=msg["publisherSdpAnswer"]["sdp"], type="answer") + ) + print(f" -> [{name}] publisher answer set") + await ack(uid) + + elif mtype == "webrtcIceCandidate": + cand = msg["webrtcIceCandidate"] + candidate_str = cand.get("candidate", "") + target = cand.get("target", "") + sdp_mid = cand.get("sdpMid", "0") + sdp_mline = cand.get("sdpMlineIndex", 0) + + if not candidate_str: + continue + parts = candidate_str.split() + if len(parts) < 8: + continue + + try: + tcptype = None + if "tcptype" in parts: + ti = parts.index("tcptype") + tcptype = parts[ti + 1] + + ice_c = RTCIceCandidate( + component=int(parts[1]), + foundation=parts[0].replace("candidate:", ""), + ip=parts[4], + port=int(parts[5]), + priority=int(parts[3]), + protocol=parts[2].lower(), + type=parts[7], + tcpType=tcptype, + sdpMid=sdp_mid, + sdpMLineIndex=sdp_mline + ) + if target == "SUBSCRIBER": + await pc_sub_ref[0].addIceCandidate(ice_c) + print(f" -> [{name}] sub ICE: {parts[2]} {parts[4]}:{parts[5]}") + elif target == "PUBLISHER": + await pc_pub_ref[0].addIceCandidate(ice_c) + print(f" -> [{name}] pub ICE: {parts[2]} {parts[4]}:{parts[5]}") + except Exception as e: + print(f" -> [{name}] ICE err: {e}") + + elif mtype in ("setSlots", "slotsConfig", "slotsMeta", "vadActivity", + "updateDescription", "upsertDescription", "sdkCodecsInfo", + "pingPong", "selfQualityReport", "upsertParticipantsQualityReport"): + await ack(uid) + + else: + print(f" -> [{name}] unhandled: {mtype}") + if uid: + await ack(uid) + + except websockets.exceptions.ConnectionClosed as e: + print(f" -> [{name}] WS closed: {e}") + except Exception as e: + import traceback + print(f" -> [{name}] WS err: {e}") + traceback.print_exc() + + ws_task = asyncio.create_task(ws_loop()) + + return { + "name": name, + "ws": ws, + "ws_task": ws_task, + "pc_pub_ref": pc_pub_ref, + "pc_sub_ref": pc_sub_ref, + "video_track": video_track, + "receiver": receiver_obj, + "track_tasks": track_tasks + } + + +async def run(): + print(""" + VCSend - Video QR Transfer + Request/Response over Yandex Telemost SFU + by zowue for olc +""") + + print("[0/3] Getting conference info...") + sender_conn = get_connection_info("QR_Sender") + receiver_conn = get_connection_info("QR_Receiver") + print(f" -> sender room: {sender_conn['room_id']}") + print(f" -> receiver room: {receiver_conn['room_id']}") + + print("\n[1/3] Connecting sender...") + sender = await connect_peer("QR_Sender", sender_conn) + await asyncio.sleep(5) + + print("\n[2/3] Connecting receiver...") + receiver = await connect_peer("QR_Receiver", receiver_conn) + await asyncio.sleep(5) + + print("\n[3/3] Transfer...") + url = "zarazaex.xyz/curl.txt" + if not url.startswith("http"): + url = "https://" + url + + print(f" -> fetching {url}") + resp = requests.get(url, timeout=10) + resp.raise_for_status() + data = resp.content + print(f" -> got {len(data)} bytes") + + tid = gen_uid() + chunks = chunk_data(data, tid) + sender["video_track"].set_data(chunks) + print(f" -> {len(chunks)} QR frames set, waiting for decode...") + + for i in range(300): + await asyncio.sleep(1) + if i % 15 == 0: + print(f" -> {i}s elapsed") + if receiver["receiver"] and receiver["receiver"].result is not None: + result = receiver["receiver"].result + print(f"\n :P got {len(result)} bytes\n") + print("--- content ---") + try: + print(result.decode("utf-8")) + except Exception: + print(f"[binary {len(result)} bytes]") + print("--- end ---\n") + break + else: + print(" X timeout — check /tmp/qr_recv_*.png") + + for p in [sender, receiver]: + p["ws_task"].cancel() + try: + await p["ws"].close() + except Exception: + pass + try: + await p["pc_pub_ref"][0].close() + await p["pc_sub_ref"][0].close() + except Exception: + pass + + +if __name__ == "__main__": + try: + asyncio.run(run()) + except KeyboardInterrupt: + print("\ninterrupted") \ No newline at end of file