Files
olcrtc/code/jazz_poc.py

243 lines
13 KiB
Python
Executable File

#!/usr/bin/env python3
"""PoC: SaluteJazz DataChannel over LiveKit."""
import asyncio
import io
import json
import logging
import time
import uuid
import aiohttp
from aiortc import RTCConfiguration, RTCIceCandidate, RTCIceServer, RTCPeerConnection, RTCSessionDescription
from aiortc.mediastreams import AudioStreamTrack
from aiortc.rtcconfiguration import RTCBundlePolicy
logging.getLogger("aiortc").setLevel(logging.WARNING)
API_BASE = "https://bk.salutejazz.ru"
JAZZ_HEADERS = {"X-Jazz-ClientId": str(uuid.uuid4()), "X-Jazz-AuthType": "ANONYMOUS", "X-Client-AuthType": "ANONYMOUS", "Content-Type": "application/json"}
TEST_MESSAGES = ["Hello Jazz DC!", "Hello world", "X" * 100, "Final test"]
def _pb_varint(v: int) -> bytes:
b = bytearray()
while v > 0x7F: b.append((v & 0x7F) | 0x80); v >>= 7
b.append(v & 0x7F)
return bytes(b)
def _pb_field(f: int, w: int, d: bytes) -> bytes:
t = _pb_varint((f << 3) | w)
return t + d if w == 0 else (t + _pb_varint(len(d)) + d if w == 2 else t + d)
def _read_varint(s: io.BytesIO) -> int | None:
res, shift = 0, 0
while b := s.read(1):
res |= (b[0] & 0x7F) << shift
if not (b[0] & 0x80): return res
shift += 7
return None
def encode_data_packet(payload: bytes, topic: str = "") -> bytes:
uf = _pb_field(2, 2, payload) + (_pb_field(4, 2, topic.encode()) if topic else b"") + _pb_field(8, 2, str(uuid.uuid4()).encode())
return _pb_field(1, 0, _pb_varint(0)) + _pb_field(2, 2, uf)
def decode_data_packet(raw: bytes) -> tuple[bytes, str] | None:
s = io.BytesIO(raw)
ud = None
while (tg := _read_varint(s)) is not None:
wt = tg & 0x07
if wt == 0: _read_varint(s)
elif wt == 2:
l = _read_varint(s)
if l is None: break
d = s.read(l)
if (tg >> 3) == 2: ud = d
elif wt == 1: s.read(8)
elif wt == 5: s.read(4)
else: break
if ud is None: return None
p, t, ins = b"", "", io.BytesIO(ud)
while (tg := _read_varint(ins)) is not None:
wt = tg & 0x07
if wt == 0: _read_varint(ins)
elif wt == 2:
l = _read_varint(ins)
if l is None: break
d = ins.read(l)
fn = tg >> 3
if fn == 2: p = d
elif fn == 4: t = d.decode(errors="replace")
elif wt == 1: ins.read(8)
elif wt == 5: ins.read(4)
else: break
return p, t
async def _create_peer(name: str, room: dict, session: aiohttp.ClientSession, is_server: bool = False, stats: dict = None) -> dict:
ws = await session.ws_connect(room["connectorUrl"])
await ws.send_json({"roomId": room["roomId"], "event": "join", "requestId": str(uuid.uuid4()), "payload": {"password": room["password"], "participantName": name, "supportedFeatures": {"attachedRooms": True, "sessionGroups": True}, "isSilent": False}})
peer = {"ws": ws, "pc_sub": None, "pc_pub": None, "dc": None, "ready": asyncio.Event(), "sub_ready": asyncio.Event()}
group_id, p_ice_sub, p_ice_pub = None, [], []
ice_servers = []
async def ws_loop():
nonlocal group_id
async for msg in ws:
if msg.type == aiohttp.WSMsgType.TEXT:
data = json.loads(msg.data)
ev = data.get("event", "")
p = data.get("payload", {})
m = p.get("method", "")
if ev == "join-response": group_id = p.get("participantGroup", {}).get("groupId")
elif ev == "media-out" and m == "rtc:config":
for s in p.get("configuration", {}).get("iceServers", []):
urls = [u for u in s.get("urls", []) if "transport=udp" in u]
if urls: ice_servers.append(RTCIceServer(urls=[urls[0]], username=s.get("username"), credential=s.get("credential")))
elif ev == "media-out" and m == "rtc:offer" and not peer["pc_sub"]:
peer["pc_sub"] = RTCPeerConnection(configuration=RTCConfiguration(iceServers=ice_servers, bundlePolicy=RTCBundlePolicy.MAX_BUNDLE))
@peer["pc_sub"].on("connectionstatechange")
def _():
if peer["pc_sub"].connectionState == "connected": peer["sub_ready"].set()
@peer["pc_sub"].on("datachannel")
def on_dc(ch):
if ch.label != "_reliable": return
@ch.on("message")
def on_msg(msg_data):
parsed = decode_data_packet(msg_data if isinstance(msg_data, bytes) else msg_data.encode())
if not parsed or parsed[1] != "poc": return
stats["recv"] += 1
if is_server and peer["dc"]:
try:
peer["dc"].send(encode_data_packet(f"Echo: {parsed[0].decode()}".encode(), "poc"))
stats["sent"] += 1
except: pass
@peer["pc_sub"].on("icecandidate")
async def on_sub_ice(e):
if e and e.candidate and group_id:
await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:ice", "rtcIceCandidates": [{"candidate": e.candidate.candidate, "sdpMid": e.candidate.sdpMid, "sdpMLineIndex": e.candidate.sdpMLineIndex, "usernameFragment": "", "target": "SUBSCRIBER"}]}})
await peer["pc_sub"].setRemoteDescription(RTCSessionDescription(sdp=p["description"]["sdp"], type="offer"))
ans = await peer["pc_sub"].createAnswer()
await peer["pc_sub"].setLocalDescription(ans)
await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:answer", "description": {"type": "answer", "sdp": peer["pc_sub"].localDescription.sdp}}})
for c in p_ice_sub:
pts = c.get("candidate","").split()
if len(pts) >= 8: await peer["pc_sub"].addIceCandidate(RTCIceCandidate(int(pts[1]), pts[0].split(":")[1], pts[4], int(pts[5]), int(pts[3]), pts[2], pts[7], str(c.get("sdpMid", "0")), c.get("sdpMLineIndex", 0)))
p_ice_sub.clear()
await asyncio.sleep(0.3)
# pub
peer["pc_pub"] = RTCPeerConnection(configuration=RTCConfiguration(iceServers=ice_servers, bundlePolicy=RTCBundlePolicy.MAX_BUNDLE))
peer["pc_pub"].addTrack(AudioStreamTrack())
peer["dc"] = peer["pc_pub"].createDataChannel("_reliable", ordered=True)
@peer["dc"].on("open")
def on_open(): peer["ready"].set()
@peer["dc"].on("message")
def on_pub_msg(msg_data):
parsed = decode_data_packet(msg_data if isinstance(msg_data, bytes) else msg_data.encode())
if parsed and parsed[1] == "poc": stats["recv"] += 1
@peer["pc_pub"].on("icecandidate")
async def on_pub_ice(e):
if e and e.candidate and group_id:
await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:ice", "rtcIceCandidates": [{"candidate": e.candidate.candidate, "sdpMid": e.candidate.sdpMid, "sdpMLineIndex": e.candidate.sdpMLineIndex, "usernameFragment": "", "target": "PUBLISHER"}]}})
await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:track:add", "cid": str(uuid.uuid4()), "track": {"type": "AUDIO", "source": "MICROPHONE", "muted": True}}})
pub_offer = await peer["pc_pub"].createOffer()
await peer["pc_pub"].setLocalDescription(pub_offer)
await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:offer", "description": {"type": "offer", "sdp": peer["pc_pub"].localDescription.sdp}}})
elif ev == "media-out" and m == "rtc:answer" and peer["pc_pub"]:
await peer["pc_pub"].setRemoteDescription(RTCSessionDescription(sdp=p["description"]["sdp"], type="answer"))
for c in p_ice_pub:
pts = c.get("candidate","").split()
if len(pts) >= 8: await peer["pc_pub"].addIceCandidate(RTCIceCandidate(int(pts[1]), pts[0].split(":")[1], pts[4], int(pts[5]), int(pts[3]), pts[2], pts[7], str(c.get("sdpMid", "0")), c.get("sdpMLineIndex", 0)))
p_ice_pub.clear()
elif ev == "media-out" and m == "rtc:ice":
for c in p.get("rtcIceCandidates", []):
pts = c.get("candidate","").split()
if len(pts) < 8: continue
ice = RTCIceCandidate(int(pts[1]), pts[0].split(":")[1], pts[4], int(pts[5]), int(pts[3]), pts[2], pts[7], str(c.get("sdpMid", "0")), c.get("sdpMLineIndex", 0))
tgt = c.get("target")
if tgt == "SUBSCRIBER": (await peer["pc_sub"].addIceCandidate(ice)) if peer["pc_sub"] else p_ice_sub.append(c)
elif tgt == "PUBLISHER": (await peer["pc_pub"].addIceCandidate(ice)) if peer["pc_pub"] else p_ice_pub.append(c)
async def _keep():
while not ws.closed:
await asyncio.sleep(5)
if group_id: await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:ping", "ping_req": {"timestamp": int(time.time()*1000), "rtt": 0}}})
peer["task"] = asyncio.create_task(ws_loop())
peer["keep"] = asyncio.create_task(_keep())
return peer
async def run_poc() -> dict:
print("\n--- SaluteJazz PoC ---")
results = {"server_ok": False, "client_ok": False, "sent": 0, "recv": 0, "errors": []}
s_stats, c_stats = {"sent": 0, "recv": 0}, {"sent": 0, "recv": 0}
async with aiohttp.ClientSession() as session:
try:
r = await session.post(f"{API_BASE}/room/create-meeting", headers=JAZZ_HEADERS, json={"title": "PoC", "guestEnabled": True, "lobbyEnabled": False})
rj = await r.json()
r2 = await session.post(f"{API_BASE}/room/{rj['roomId']}/preconnect", headers=JAZZ_HEADERS, json={"password": rj["password"], "jazzNextMigration": {"b2bBaseRoomSupport": True, "demoRoomBaseSupport": True, "demoRoomVersionSupport": 2, "mediaWithoutAutoSubscribeSupport": True}})
room_inf = {"roomId": rj["roomId"], "password": rj["password"], "connectorUrl": (await r2.json())["connectorUrl"]}
except Exception as e:
results["errors"].append(f"Auth fail: {e}")
return results
print("[1/3] Connecting Server & Client...")
try:
server = await _create_peer("Server", room_inf, session, is_server=True, stats=s_stats)
await asyncio.wait_for(server["ready"].wait(), 15.0)
results["server_ok"] = True
client = await _create_peer("Client", room_inf, session, is_server=False, stats=c_stats)
await asyncio.wait_for(client["ready"].wait(), 15.0)
results["client_ok"] = True
print(" :P Peers connected")
except Exception as e:
results["errors"].append(str(e))
return results
print("\n[2/3] Exchanging messages...")
await asyncio.sleep(1)
for idx, msg in enumerate(TEST_MESSAGES, 1):
try:
client["dc"].send(encode_data_packet(msg.encode(), "poc"))
c_stats["sent"] += 1
print(f" -> Sent: {msg}")
await asyncio.sleep(0.5)
except Exception as e:
results["errors"].append(f"Sending {idx} failed: {str(e)}")
await asyncio.sleep(3)
results["sent"], results["recv"] = c_stats["sent"], c_stats["recv"]
print("\n[3/3] Cleaning up...")
for p in (server, client):
for t in ["task", "keep"]: p[t].cancel()
await p["ws"].close()
for pc in [p["pc_sub"], p["pc_pub"]]:
if pc: await pc.close()
return results
def print_results(res: dict):
print("\n--- TEST RESULTS ---")
print(f"Server: {':P' if res['server_ok'] else 'X'} / Client: {':P' if res['client_ok'] else 'X'}")
print(f"Messages: Sent {res['sent']} / Recv {res['recv']}")
if res['errors']:
for e in res['errors']: print(f" Error: {e}")
print(f"\n{':P SUCCESS' if res['sent'] and res['sent'] == res['recv'] else 'X FAILED'}\n")
if __name__ == "__main__":
try: res = asyncio.run(run_poc()); print_results(res)
except KeyboardInterrupt: pass