mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-05-26 23:19:47 +00:00
242 lines
13 KiB
Python
Executable File
242 lines
13 KiB
Python
Executable File
#!/usr/bin/env python3
|
|
"""PoC: SaluteJazz DataChannel over LiveKit."""
|
|
|
|
import asyncio
|
|
import io
|
|
import json
|
|
import logging
|
|
import time
|
|
import uuid
|
|
import aiohttp
|
|
from aiortc import RTCConfiguration, RTCIceCandidate, RTCIceServer, RTCPeerConnection, RTCSessionDescription
|
|
from aiortc.mediastreams import AudioStreamTrack
|
|
from aiortc.rtcconfiguration import RTCBundlePolicy
|
|
|
|
logging.getLogger("aiortc").setLevel(logging.WARNING)
|
|
|
|
API_BASE = "https://bk.salutejazz.ru"
|
|
JAZZ_HEADERS = {"X-Jazz-ClientId": str(uuid.uuid4()), "X-Jazz-AuthType": "ANONYMOUS", "X-Client-AuthType": "ANONYMOUS", "Content-Type": "application/json"}
|
|
TEST_MESSAGES = ["Hello Jazz DC!", "Hello world", "X" * 100, "Final test"]
|
|
|
|
def _pb_varint(v: int) -> bytes:
|
|
b = bytearray()
|
|
while v > 0x7F: b.append((v & 0x7F) | 0x80); v >>= 7
|
|
b.append(v & 0x7F)
|
|
return bytes(b)
|
|
|
|
def _pb_field(f: int, w: int, d: bytes) -> bytes:
|
|
t = _pb_varint((f << 3) | w)
|
|
return t + d if w == 0 else (t + _pb_varint(len(d)) + d if w == 2 else t + d)
|
|
|
|
def _read_varint(s: io.BytesIO) -> int | None:
|
|
res, shift = 0, 0
|
|
while b := s.read(1):
|
|
res |= (b[0] & 0x7F) << shift
|
|
if not (b[0] & 0x80): return res
|
|
shift += 7
|
|
return None
|
|
|
|
def encode_data_packet(payload: bytes, topic: str = "") -> bytes:
|
|
uf = _pb_field(2, 2, payload) + (_pb_field(4, 2, topic.encode()) if topic else b"") + _pb_field(8, 2, str(uuid.uuid4()).encode())
|
|
return _pb_field(1, 0, _pb_varint(0)) + _pb_field(2, 2, uf)
|
|
|
|
def decode_data_packet(raw: bytes) -> tuple[bytes, str] | None:
|
|
s = io.BytesIO(raw)
|
|
ud = None
|
|
while (tg := _read_varint(s)) is not None:
|
|
wt = tg & 0x07
|
|
if wt == 0: _read_varint(s)
|
|
elif wt == 2:
|
|
l = _read_varint(s)
|
|
if l is None: break
|
|
d = s.read(l)
|
|
if (tg >> 3) == 2: ud = d
|
|
elif wt == 1: s.read(8)
|
|
elif wt == 5: s.read(4)
|
|
else: break
|
|
if ud is None: return None
|
|
p, t, ins = b"", "", io.BytesIO(ud)
|
|
while (tg := _read_varint(ins)) is not None:
|
|
wt = tg & 0x07
|
|
if wt == 0: _read_varint(ins)
|
|
elif wt == 2:
|
|
l = _read_varint(ins)
|
|
if l is None: break
|
|
d = ins.read(l)
|
|
fn = tg >> 3
|
|
if fn == 2: p = d
|
|
elif fn == 4: t = d.decode(errors="replace")
|
|
elif wt == 1: ins.read(8)
|
|
elif wt == 5: ins.read(4)
|
|
else: break
|
|
return p, t
|
|
|
|
async def _create_peer(name: str, room: dict, session: aiohttp.ClientSession, is_server: bool = False, stats: dict = None) -> dict:
|
|
ws = await session.ws_connect(room["connectorUrl"])
|
|
await ws.send_json({"roomId": room["roomId"], "event": "join", "requestId": str(uuid.uuid4()), "payload": {"password": room["password"], "participantName": name, "supportedFeatures": {"attachedRooms": True, "sessionGroups": True}, "isSilent": False}})
|
|
|
|
peer = {"ws": ws, "pc_sub": None, "pc_pub": None, "dc": None, "ready": asyncio.Event(), "sub_ready": asyncio.Event()}
|
|
group_id, p_ice_sub, p_ice_pub = None, [], []
|
|
ice_servers = []
|
|
|
|
async def ws_loop():
|
|
nonlocal group_id
|
|
async for msg in ws:
|
|
if msg.type == aiohttp.WSMsgType.TEXT:
|
|
data = json.loads(msg.data)
|
|
ev = data.get("event", "")
|
|
p = data.get("payload", {})
|
|
m = p.get("method", "")
|
|
|
|
if ev == "join-response": group_id = p.get("participantGroup", {}).get("groupId")
|
|
elif ev == "media-out" and m == "rtc:config":
|
|
for s in p.get("configuration", {}).get("iceServers", []):
|
|
urls = [u for u in s.get("urls", []) if "transport=udp" in u]
|
|
if urls: ice_servers.append(RTCIceServer(urls=[urls[0]], username=s.get("username"), credential=s.get("credential")))
|
|
|
|
elif ev == "media-out" and m == "rtc:offer" and not peer["pc_sub"]:
|
|
peer["pc_sub"] = RTCPeerConnection(configuration=RTCConfiguration(iceServers=ice_servers, bundlePolicy=RTCBundlePolicy.MAX_BUNDLE))
|
|
@peer["pc_sub"].on("connectionstatechange")
|
|
def _():
|
|
if peer["pc_sub"].connectionState == "connected": peer["sub_ready"].set()
|
|
|
|
@peer["pc_sub"].on("datachannel")
|
|
def on_dc(ch):
|
|
if ch.label != "_reliable": return
|
|
@ch.on("message")
|
|
def on_msg(msg_data):
|
|
parsed = decode_data_packet(msg_data if isinstance(msg_data, bytes) else msg_data.encode())
|
|
if not parsed or parsed[1] != "poc": return
|
|
stats["recv"] += 1
|
|
if is_server and peer["dc"]:
|
|
try:
|
|
peer["dc"].send(encode_data_packet(f"Echo: {parsed[0].decode()}".encode(), "poc"))
|
|
stats["sent"] += 1
|
|
except: pass
|
|
|
|
@peer["pc_sub"].on("icecandidate")
|
|
async def on_sub_ice(e):
|
|
if e and e.candidate and group_id:
|
|
await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:ice", "rtcIceCandidates": [{"candidate": e.candidate.candidate, "sdpMid": e.candidate.sdpMid, "sdpMLineIndex": e.candidate.sdpMLineIndex, "usernameFragment": "", "target": "SUBSCRIBER"}]}})
|
|
|
|
await peer["pc_sub"].setRemoteDescription(RTCSessionDescription(sdp=p["description"]["sdp"], type="offer"))
|
|
ans = await peer["pc_sub"].createAnswer()
|
|
await peer["pc_sub"].setLocalDescription(ans)
|
|
await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:answer", "description": {"type": "answer", "sdp": peer["pc_sub"].localDescription.sdp}}})
|
|
for c in p_ice_sub:
|
|
pts = c.get("candidate","").split()
|
|
if len(pts) >= 8: await peer["pc_sub"].addIceCandidate(RTCIceCandidate(int(pts[1]), pts[0].split(":")[1], pts[4], int(pts[5]), int(pts[3]), pts[2], pts[7], str(c.get("sdpMid", "0")), c.get("sdpMLineIndex", 0)))
|
|
p_ice_sub.clear()
|
|
await asyncio.sleep(0.3)
|
|
|
|
peer["pc_pub"] = RTCPeerConnection(configuration=RTCConfiguration(iceServers=ice_servers, bundlePolicy=RTCBundlePolicy.MAX_BUNDLE))
|
|
peer["pc_pub"].addTrack(AudioStreamTrack())
|
|
peer["dc"] = peer["pc_pub"].createDataChannel("_reliable", ordered=True)
|
|
|
|
@peer["dc"].on("open")
|
|
def on_open(): peer["ready"].set()
|
|
|
|
@peer["dc"].on("message")
|
|
def on_pub_msg(msg_data):
|
|
parsed = decode_data_packet(msg_data if isinstance(msg_data, bytes) else msg_data.encode())
|
|
if parsed and parsed[1] == "poc": stats["recv"] += 1
|
|
|
|
@peer["pc_pub"].on("icecandidate")
|
|
async def on_pub_ice(e):
|
|
if e and e.candidate and group_id:
|
|
await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:ice", "rtcIceCandidates": [{"candidate": e.candidate.candidate, "sdpMid": e.candidate.sdpMid, "sdpMLineIndex": e.candidate.sdpMLineIndex, "usernameFragment": "", "target": "PUBLISHER"}]}})
|
|
|
|
await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:track:add", "cid": str(uuid.uuid4()), "track": {"type": "AUDIO", "source": "MICROPHONE", "muted": True}}})
|
|
pub_offer = await peer["pc_pub"].createOffer()
|
|
await peer["pc_pub"].setLocalDescription(pub_offer)
|
|
await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:offer", "description": {"type": "offer", "sdp": peer["pc_pub"].localDescription.sdp}}})
|
|
|
|
elif ev == "media-out" and m == "rtc:answer" and peer["pc_pub"]:
|
|
await peer["pc_pub"].setRemoteDescription(RTCSessionDescription(sdp=p["description"]["sdp"], type="answer"))
|
|
for c in p_ice_pub:
|
|
pts = c.get("candidate","").split()
|
|
if len(pts) >= 8: await peer["pc_pub"].addIceCandidate(RTCIceCandidate(int(pts[1]), pts[0].split(":")[1], pts[4], int(pts[5]), int(pts[3]), pts[2], pts[7], str(c.get("sdpMid", "0")), c.get("sdpMLineIndex", 0)))
|
|
p_ice_pub.clear()
|
|
|
|
elif ev == "media-out" and m == "rtc:ice":
|
|
for c in p.get("rtcIceCandidates", []):
|
|
pts = c.get("candidate","").split()
|
|
if len(pts) < 8: continue
|
|
ice = RTCIceCandidate(int(pts[1]), pts[0].split(":")[1], pts[4], int(pts[5]), int(pts[3]), pts[2], pts[7], str(c.get("sdpMid", "0")), c.get("sdpMLineIndex", 0))
|
|
tgt = c.get("target")
|
|
if tgt == "SUBSCRIBER": (await peer["pc_sub"].addIceCandidate(ice)) if peer["pc_sub"] else p_ice_sub.append(c)
|
|
elif tgt == "PUBLISHER": (await peer["pc_pub"].addIceCandidate(ice)) if peer["pc_pub"] else p_ice_pub.append(c)
|
|
|
|
async def _keep():
|
|
while not ws.closed:
|
|
await asyncio.sleep(5)
|
|
if group_id: await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:ping", "ping_req": {"timestamp": int(time.time()*1000), "rtt": 0}}})
|
|
|
|
peer["task"] = asyncio.create_task(ws_loop())
|
|
peer["keep"] = asyncio.create_task(_keep())
|
|
return peer
|
|
|
|
async def run_poc() -> dict:
|
|
print("\n--- SaluteJazz PoC ---")
|
|
results = {"server_ok": False, "client_ok": False, "sent": 0, "recv": 0, "errors": []}
|
|
s_stats, c_stats = {"sent": 0, "recv": 0}, {"sent": 0, "recv": 0}
|
|
|
|
async with aiohttp.ClientSession() as session:
|
|
try:
|
|
r = await session.post(f"{API_BASE}/room/create-meeting", headers=JAZZ_HEADERS, json={"title": "PoC", "guestEnabled": True, "lobbyEnabled": False})
|
|
rj = await r.json()
|
|
r2 = await session.post(f"{API_BASE}/room/{rj['roomId']}/preconnect", headers=JAZZ_HEADERS, json={"password": rj["password"], "jazzNextMigration": {"b2bBaseRoomSupport": True, "demoRoomBaseSupport": True, "demoRoomVersionSupport": 2, "mediaWithoutAutoSubscribeSupport": True}})
|
|
room_inf = {"roomId": rj["roomId"], "password": rj["password"], "connectorUrl": (await r2.json())["connectorUrl"]}
|
|
except Exception as e:
|
|
results["errors"].append(f"Auth fail: {e}")
|
|
return results
|
|
|
|
print("[1/3] Connecting Server & Client...")
|
|
try:
|
|
server = await _create_peer("Server", room_inf, session, is_server=True, stats=s_stats)
|
|
await asyncio.wait_for(server["ready"].wait(), 15.0)
|
|
results["server_ok"] = True
|
|
|
|
client = await _create_peer("Client", room_inf, session, is_server=False, stats=c_stats)
|
|
await asyncio.wait_for(client["ready"].wait(), 15.0)
|
|
results["client_ok"] = True
|
|
print(" :P Peers connected")
|
|
except Exception as e:
|
|
results["errors"].append(str(e))
|
|
return results
|
|
|
|
print("\n[2/3] Exchanging messages...")
|
|
await asyncio.sleep(1)
|
|
for idx, msg in enumerate(TEST_MESSAGES, 1):
|
|
try:
|
|
client["dc"].send(encode_data_packet(msg.encode(), "poc"))
|
|
c_stats["sent"] += 1
|
|
print(f" -> Sent: {msg}")
|
|
await asyncio.sleep(0.5)
|
|
except Exception as e:
|
|
results["errors"].append(f"Sending {idx} failed: {str(e)}")
|
|
|
|
await asyncio.sleep(3)
|
|
results["sent"], results["recv"] = c_stats["sent"], c_stats["recv"]
|
|
|
|
print("\n[3/3] Cleaning up...")
|
|
for p in (server, client):
|
|
for t in ["task", "keep"]: p[t].cancel()
|
|
await p["ws"].close()
|
|
for pc in [p["pc_sub"], p["pc_pub"]]:
|
|
if pc: await pc.close()
|
|
|
|
return results
|
|
|
|
def print_results(res: dict):
|
|
print("\n--- TEST RESULTS ---")
|
|
print(f"Server: {':P' if res['server_ok'] else 'X'} / Client: {':P' if res['client_ok'] else 'X'}")
|
|
print(f"Messages: Sent {res['sent']} / Recv {res['recv']}")
|
|
if res['errors']:
|
|
for e in res['errors']: print(f" Error: {e}")
|
|
print(f"\n{':P SUCCESS' if res['sent'] and res['sent'] == res['recv'] else 'X FAILED'}\n")
|
|
|
|
if __name__ == "__main__":
|
|
try: res = asyncio.run(run_poc()); print_results(res)
|
|
except KeyboardInterrupt: pass
|