From 085aadcad718f58b8bed7b4af41d054202102d76 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Tue, 19 May 2026 21:39:07 +0300 Subject: [PATCH] refactor: remove SaluteJazz carrier support --- cmd/olcrtc/main_test.go | 9 +- code/jazz_info.py | 59 - code/jazz_poc_datachannel.py | 241 ---- docker-compose.client.yml | 2 +- docker-compose.server.yml | 2 +- docs/about.md | 76 +- docs/configuration.md | 2 +- docs/fast.md | 9 +- docs/project-map.md | 7 +- docs/server.example.yaml | 6 +- docs/settings.md | 20 +- docs/uri.md | 8 +- internal/app/session/session.go | 5 +- internal/app/session/session_test.go | 15 +- internal/auth/auth.go | 2 +- internal/auth/salutejazz/api.go | 198 --- internal/auth/salutejazz/api_test.go | 143 -- internal/auth/salutejazz/salutejazz.go | 70 - internal/config/config.go | 4 +- internal/e2e/tunnel_test.go | 46 +- internal/engine/builtin/builtin.go | 11 +- internal/engine/engine.go | 4 +- internal/engine/jitsi/jitsi.go | 2 +- internal/engine/livekit/livekit.go | 2 +- internal/engine/salutejazz/close_test.go | 164 --- internal/engine/salutejazz/datapacket.go | 144 -- internal/engine/salutejazz/datapacket_test.go | 70 - internal/engine/salutejazz/salutejazz.go | 1205 ----------------- .../engine/salutejazz/session_helpers_test.go | 320 ----- mobile/mobile.go | 11 +- mobile/mobile_test.go | 21 +- pkg/olcrtc/olcrtc.go | 14 +- pkg/olcrtc/tunnel/tunnel.go | 6 +- script/cnc.sh | 8 +- script/docker/olcrtc-entrypoint.sh | 27 +- script/srv.sh | 30 +- 36 files changed, 97 insertions(+), 2866 deletions(-) delete mode 100755 code/jazz_info.py delete mode 100755 code/jazz_poc_datachannel.py delete mode 100644 internal/auth/salutejazz/api.go delete mode 100644 internal/auth/salutejazz/api_test.go delete mode 100644 internal/auth/salutejazz/salutejazz.go delete mode 100644 internal/engine/salutejazz/close_test.go delete mode 100644 internal/engine/salutejazz/datapacket.go delete mode 100644 internal/engine/salutejazz/datapacket_test.go delete mode 100644 internal/engine/salutejazz/salutejazz.go delete mode 100644 internal/engine/salutejazz/session_helpers_test.go diff --git a/cmd/olcrtc/main_test.go b/cmd/olcrtc/main_test.go index 7c13e8e..e8292b9 100644 --- a/cmd/olcrtc/main_test.go +++ b/cmd/olcrtc/main_test.go @@ -87,7 +87,8 @@ func TestRunWithConfigValidationAndDataDirErrors(t *testing.T) { scfg := session.Config{ Mode: "srv", Transport: "datachannel", - Auth: "jazz", + Auth: "jitsi", + RoomID: "https://meet.small-dm.ru/test", KeyHex: "key", DNSServer: "1.1.1.1:53", } @@ -117,7 +118,7 @@ func TestRunWithArgsSuccessfulSessionReturn(t *testing.T) { called := false runSession = func(ctx context.Context, cfg session.Config) error { called = true - if cfg.Mode != "srv" || cfg.Auth != "jazz" { + if cfg.Mode != "srv" || cfg.Auth != "jitsi" { t.Fatalf("session config = %+v", cfg) } select { @@ -132,7 +133,9 @@ func TestRunWithArgsSuccessfulSessionReturn(t *testing.T) { mode: srv link: direct auth: - provider: jazz + provider: jitsi +room: + id: https://meet.small-dm.ru/test crypto: key: key net: diff --git a/code/jazz_info.py b/code/jazz_info.py deleted file mode 100755 index 90081e5..0000000 --- a/code/jazz_info.py +++ /dev/null @@ -1,59 +0,0 @@ -#!/usr/bin/env python3 -import asyncio -import json -import uuid -import aiohttp - -API_BASE = "https://bk.salutejazz.ru" -JAZZ_HEADERS = {"X-Jazz-ClientId": str(uuid.uuid4()), "X-Jazz-AuthType": "ANONYMOUS", "X-Client-AuthType": "ANONYMOUS", "Content-Type": "application/json"} - -async def get_jazz_info(): - print("\n--- SaluteJazz Info ---") - timeout = aiohttp.ClientTimeout(total=15) - async with aiohttp.ClientSession(timeout=timeout) as session: - print("[1/4] API Initialization...") - try: - r = await session.post(f"{API_BASE}/room/create-meeting", headers=JAZZ_HEADERS, json={"title": "InfoBot", "guestEnabled": True, "lobbyEnabled": False, "room3dEnabled": False}) - rj = await r.json() - print(" :P Room created") - print(json.dumps(rj, indent=2)) - - r2 = await session.post(f"{API_BASE}/room/{rj['roomId']}/preconnect", headers=JAZZ_HEADERS, json={"password": rj["password"], "jazzNextMigration": {"b2bBaseRoomSupport": True, "sdkRoomSupport": True, "mediaWithoutAutoSubscribeSupport": True}}) - r2j = await r2.json() - print(" :P Preconnect info received") - print(json.dumps(r2j, indent=2)) - conn_url = r2j['connectorUrl'] - except Exception as e: - print(f" X Error: {e}"); return - - print(f"\n[2/4] Connecting to signaling...") - async with session.ws_connect(conn_url) as ws: - await ws.send_json({"roomId": rj["roomId"], "event": "join", "requestId": str(uuid.uuid4()), "payload": {"password": rj["password"], "participantName": "InfoBot", "supportedFeatures": {"attachedRooms": True}, "isSilent": False}}) - print(" :P Signaling established") - - print("\n[3/4] Collecting network & media details...") - end = asyncio.get_event_loop().time() + 8 - while asyncio.get_event_loop().time() < end: - try: - m = await asyncio.wait_for(ws.receive(), 1) - if m.type == aiohttp.WSMsgType.TEXT: - d = json.loads(m.data); ev = d.get("event", ""); p = d.get("payload", {}); meth = p.get("method", "") - print(f" -> Event: {ev}{' ('+meth+')' if meth else ''}") - if meth == "rtc:config": - print("\n--- ICE Servers ---") - print(json.dumps(p.get("configuration", {}).get("iceServers", []), indent=2)) - elif meth == "rtc:offer": - print("\n--- SDP Offer (Codecs & Quality) ---") - print(p.get("description", {}).get("sdp", "")) - elif ev == "join-response": - print("\n--- Participant Group ---") - print(json.dumps(p.get("participantGroup", {}), indent=2)) - else: - print(json.dumps(p, indent=2)) - except: continue - - print("\n--- INFO COLLECTION COMPLETE ---") - -if __name__ == "__main__": - try: asyncio.run(get_jazz_info()) - except KeyboardInterrupt: pass diff --git a/code/jazz_poc_datachannel.py b/code/jazz_poc_datachannel.py deleted file mode 100755 index 050c069..0000000 --- a/code/jazz_poc_datachannel.py +++ /dev/null @@ -1,241 +0,0 @@ -#!/usr/bin/env python3 -"""PoC: SaluteJazz DataChannel over LiveKit.""" - -import asyncio -import io -import json -import logging -import time -import uuid -import aiohttp -from aiortc import RTCConfiguration, RTCIceCandidate, RTCIceServer, RTCPeerConnection, RTCSessionDescription -from aiortc.mediastreams import AudioStreamTrack -from aiortc.rtcconfiguration import RTCBundlePolicy - -logging.getLogger("aiortc").setLevel(logging.WARNING) - -API_BASE = "https://bk.salutejazz.ru" -JAZZ_HEADERS = {"X-Jazz-ClientId": str(uuid.uuid4()), "X-Jazz-AuthType": "ANONYMOUS", "X-Client-AuthType": "ANONYMOUS", "Content-Type": "application/json"} -TEST_MESSAGES = ["Hello Jazz DC!", "Hello world", "X" * 100, "Final test"] - -def _pb_varint(v: int) -> bytes: - b = bytearray() - while v > 0x7F: b.append((v & 0x7F) | 0x80); v >>= 7 - b.append(v & 0x7F) - return bytes(b) - -def _pb_field(f: int, w: int, d: bytes) -> bytes: - t = _pb_varint((f << 3) | w) - return t + d if w == 0 else (t + _pb_varint(len(d)) + d if w == 2 else t + d) - -def _read_varint(s: io.BytesIO) -> int | None: - res, shift = 0, 0 - while b := s.read(1): - res |= (b[0] & 0x7F) << shift - if not (b[0] & 0x80): return res - shift += 7 - return None - -def encode_data_packet(payload: bytes, topic: str = "") -> bytes: - uf = _pb_field(2, 2, payload) + (_pb_field(4, 2, topic.encode()) if topic else b"") + _pb_field(8, 2, str(uuid.uuid4()).encode()) - return _pb_field(1, 0, _pb_varint(0)) + _pb_field(2, 2, uf) - -def decode_data_packet(raw: bytes) -> tuple[bytes, str] | None: - s = io.BytesIO(raw) - ud = None - while (tg := _read_varint(s)) is not None: - wt = tg & 0x07 - if wt == 0: _read_varint(s) - elif wt == 2: - l = _read_varint(s) - if l is None: break - d = s.read(l) - if (tg >> 3) == 2: ud = d - elif wt == 1: s.read(8) - elif wt == 5: s.read(4) - else: break - if ud is None: return None - p, t, ins = b"", "", io.BytesIO(ud) - while (tg := _read_varint(ins)) is not None: - wt = tg & 0x07 - if wt == 0: _read_varint(ins) - elif wt == 2: - l = _read_varint(ins) - if l is None: break - d = ins.read(l) - fn = tg >> 3 - if fn == 2: p = d - elif fn == 4: t = d.decode(errors="replace") - elif wt == 1: ins.read(8) - elif wt == 5: ins.read(4) - else: break - return p, t - -async def _create_peer(name: str, room: dict, session: aiohttp.ClientSession, is_server: bool = False, stats: dict = None) -> dict: - ws = await session.ws_connect(room["connectorUrl"]) - await ws.send_json({"roomId": room["roomId"], "event": "join", "requestId": str(uuid.uuid4()), "payload": {"password": room["password"], "participantName": name, "supportedFeatures": {"attachedRooms": True, "sessionGroups": True}, "isSilent": False}}) - - peer = {"ws": ws, "pc_sub": None, "pc_pub": None, "dc": None, "ready": asyncio.Event(), "sub_ready": asyncio.Event()} - group_id, p_ice_sub, p_ice_pub = None, [], [] - ice_servers = [] - - async def ws_loop(): - nonlocal group_id - async for msg in ws: - if msg.type == aiohttp.WSMsgType.TEXT: - data = json.loads(msg.data) - ev = data.get("event", "") - p = data.get("payload", {}) - m = p.get("method", "") - - if ev == "join-response": group_id = p.get("participantGroup", {}).get("groupId") - elif ev == "media-out" and m == "rtc:config": - for s in p.get("configuration", {}).get("iceServers", []): - urls = [u for u in s.get("urls", []) if "transport=udp" in u] - if urls: ice_servers.append(RTCIceServer(urls=[urls[0]], username=s.get("username"), credential=s.get("credential"))) - - elif ev == "media-out" and m == "rtc:offer" and not peer["pc_sub"]: - peer["pc_sub"] = RTCPeerConnection(configuration=RTCConfiguration(iceServers=ice_servers, bundlePolicy=RTCBundlePolicy.MAX_BUNDLE)) - @peer["pc_sub"].on("connectionstatechange") - def _(): - if peer["pc_sub"].connectionState == "connected": peer["sub_ready"].set() - - @peer["pc_sub"].on("datachannel") - def on_dc(ch): - if ch.label != "_reliable": return - @ch.on("message") - def on_msg(msg_data): - parsed = decode_data_packet(msg_data if isinstance(msg_data, bytes) else msg_data.encode()) - if not parsed or parsed[1] != "poc": return - stats["recv"] += 1 - if is_server and peer["dc"]: - try: - peer["dc"].send(encode_data_packet(f"Echo: {parsed[0].decode()}".encode(), "poc")) - stats["sent"] += 1 - except: pass - - @peer["pc_sub"].on("icecandidate") - async def on_sub_ice(e): - if e and e.candidate and group_id: - await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:ice", "rtcIceCandidates": [{"candidate": e.candidate.candidate, "sdpMid": e.candidate.sdpMid, "sdpMLineIndex": e.candidate.sdpMLineIndex, "usernameFragment": "", "target": "SUBSCRIBER"}]}}) - - await peer["pc_sub"].setRemoteDescription(RTCSessionDescription(sdp=p["description"]["sdp"], type="offer")) - ans = await peer["pc_sub"].createAnswer() - await peer["pc_sub"].setLocalDescription(ans) - await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:answer", "description": {"type": "answer", "sdp": peer["pc_sub"].localDescription.sdp}}}) - for c in p_ice_sub: - pts = c.get("candidate","").split() - if len(pts) >= 8: await peer["pc_sub"].addIceCandidate(RTCIceCandidate(int(pts[1]), pts[0].split(":")[1], pts[4], int(pts[5]), int(pts[3]), pts[2], pts[7], str(c.get("sdpMid", "0")), c.get("sdpMLineIndex", 0))) - p_ice_sub.clear() - await asyncio.sleep(0.3) - - peer["pc_pub"] = RTCPeerConnection(configuration=RTCConfiguration(iceServers=ice_servers, bundlePolicy=RTCBundlePolicy.MAX_BUNDLE)) - peer["pc_pub"].addTrack(AudioStreamTrack()) - peer["dc"] = peer["pc_pub"].createDataChannel("_reliable", ordered=True) - - @peer["dc"].on("open") - def on_open(): peer["ready"].set() - - @peer["dc"].on("message") - def on_pub_msg(msg_data): - parsed = decode_data_packet(msg_data if isinstance(msg_data, bytes) else msg_data.encode()) - if parsed and parsed[1] == "poc": stats["recv"] += 1 - - @peer["pc_pub"].on("icecandidate") - async def on_pub_ice(e): - if e and e.candidate and group_id: - await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:ice", "rtcIceCandidates": [{"candidate": e.candidate.candidate, "sdpMid": e.candidate.sdpMid, "sdpMLineIndex": e.candidate.sdpMLineIndex, "usernameFragment": "", "target": "PUBLISHER"}]}}) - - await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:track:add", "cid": str(uuid.uuid4()), "track": {"type": "AUDIO", "source": "MICROPHONE", "muted": True}}}) - pub_offer = await peer["pc_pub"].createOffer() - await peer["pc_pub"].setLocalDescription(pub_offer) - await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:offer", "description": {"type": "offer", "sdp": peer["pc_pub"].localDescription.sdp}}}) - - elif ev == "media-out" and m == "rtc:answer" and peer["pc_pub"]: - await peer["pc_pub"].setRemoteDescription(RTCSessionDescription(sdp=p["description"]["sdp"], type="answer")) - for c in p_ice_pub: - pts = c.get("candidate","").split() - if len(pts) >= 8: await peer["pc_pub"].addIceCandidate(RTCIceCandidate(int(pts[1]), pts[0].split(":")[1], pts[4], int(pts[5]), int(pts[3]), pts[2], pts[7], str(c.get("sdpMid", "0")), c.get("sdpMLineIndex", 0))) - p_ice_pub.clear() - - elif ev == "media-out" and m == "rtc:ice": - for c in p.get("rtcIceCandidates", []): - pts = c.get("candidate","").split() - if len(pts) < 8: continue - ice = RTCIceCandidate(int(pts[1]), pts[0].split(":")[1], pts[4], int(pts[5]), int(pts[3]), pts[2], pts[7], str(c.get("sdpMid", "0")), c.get("sdpMLineIndex", 0)) - tgt = c.get("target") - if tgt == "SUBSCRIBER": (await peer["pc_sub"].addIceCandidate(ice)) if peer["pc_sub"] else p_ice_sub.append(c) - elif tgt == "PUBLISHER": (await peer["pc_pub"].addIceCandidate(ice)) if peer["pc_pub"] else p_ice_pub.append(c) - - async def _keep(): - while not ws.closed: - await asyncio.sleep(5) - if group_id: await ws.send_json({"roomId": room["roomId"], "event": "media-in", "groupId": group_id, "requestId": str(uuid.uuid4()), "payload": {"method": "rtc:ping", "ping_req": {"timestamp": int(time.time()*1000), "rtt": 0}}}) - - peer["task"] = asyncio.create_task(ws_loop()) - peer["keep"] = asyncio.create_task(_keep()) - return peer - -async def run_poc() -> dict: - print("\n--- SaluteJazz PoC ---") - results = {"server_ok": False, "client_ok": False, "sent": 0, "recv": 0, "errors": []} - s_stats, c_stats = {"sent": 0, "recv": 0}, {"sent": 0, "recv": 0} - - async with aiohttp.ClientSession() as session: - try: - r = await session.post(f"{API_BASE}/room/create-meeting", headers=JAZZ_HEADERS, json={"title": "PoC", "guestEnabled": True, "lobbyEnabled": False}) - rj = await r.json() - r2 = await session.post(f"{API_BASE}/room/{rj['roomId']}/preconnect", headers=JAZZ_HEADERS, json={"password": rj["password"], "jazzNextMigration": {"b2bBaseRoomSupport": True, "demoRoomBaseSupport": True, "demoRoomVersionSupport": 2, "mediaWithoutAutoSubscribeSupport": True}}) - room_inf = {"roomId": rj["roomId"], "password": rj["password"], "connectorUrl": (await r2.json())["connectorUrl"]} - except Exception as e: - results["errors"].append(f"Auth fail: {e}") - return results - - print("[1/3] Connecting Server & Client...") - try: - server = await _create_peer("Server", room_inf, session, is_server=True, stats=s_stats) - await asyncio.wait_for(server["ready"].wait(), 15.0) - results["server_ok"] = True - - client = await _create_peer("Client", room_inf, session, is_server=False, stats=c_stats) - await asyncio.wait_for(client["ready"].wait(), 15.0) - results["client_ok"] = True - print(" :P Peers connected") - except Exception as e: - results["errors"].append(str(e)) - return results - - print("\n[2/3] Exchanging messages...") - await asyncio.sleep(1) - for idx, msg in enumerate(TEST_MESSAGES, 1): - try: - client["dc"].send(encode_data_packet(msg.encode(), "poc")) - c_stats["sent"] += 1 - print(f" -> Sent: {msg}") - await asyncio.sleep(0.5) - except Exception as e: - results["errors"].append(f"Sending {idx} failed: {str(e)}") - - await asyncio.sleep(3) - results["sent"], results["recv"] = c_stats["sent"], c_stats["recv"] - - print("\n[3/3] Cleaning up...") - for p in (server, client): - for t in ["task", "keep"]: p[t].cancel() - await p["ws"].close() - for pc in [p["pc_sub"], p["pc_pub"]]: - if pc: await pc.close() - - return results - -def print_results(res: dict): - print("\n--- TEST RESULTS ---") - print(f"Server: {':P' if res['server_ok'] else 'X'} / Client: {':P' if res['client_ok'] else 'X'}") - print(f"Messages: Sent {res['sent']} / Recv {res['recv']}") - if res['errors']: - for e in res['errors']: print(f" Error: {e}") - print(f"\n{':P SUCCESS' if res['sent'] and res['sent'] == res['recv'] else 'X FAILED'}\n") - -if __name__ == "__main__": - try: res = asyncio.run(run_poc()); print_results(res) - except KeyboardInterrupt: pass diff --git a/docker-compose.client.yml b/docker-compose.client.yml index 0ea453b..7447e74 100644 --- a/docker-compose.client.yml +++ b/docker-compose.client.yml @@ -8,7 +8,7 @@ services: network_mode: host environment: OLCRTC_MODE: cnc - OLCRTC_CARRIER: "${OLCRTC_CARRIER:?set OLCRTC_CARRIER (jitsi, telemost, jazz, wbstream, none)}" + OLCRTC_CARRIER: "${OLCRTC_CARRIER:?set OLCRTC_CARRIER (jitsi, telemost, wbstream, none)}" OLCRTC_TRANSPORT: "${OLCRTC_TRANSPORT:-datachannel}" OLCRTC_ROOM_ID: "${OLCRTC_ROOM_ID:?set OLCRTC_ROOM_ID to the server room}" OLCRTC_KEY: "${OLCRTC_KEY:?set OLCRTC_KEY to the server encryption key}" diff --git a/docker-compose.server.yml b/docker-compose.server.yml index 8cb73d5..a3f63b3 100644 --- a/docker-compose.server.yml +++ b/docker-compose.server.yml @@ -7,7 +7,7 @@ services: restart: unless-stopped environment: OLCRTC_MODE: srv - OLCRTC_CARRIER: "${OLCRTC_CARRIER:?set OLCRTC_CARRIER (jitsi, telemost, jazz, wbstream, none)}" + OLCRTC_CARRIER: "${OLCRTC_CARRIER:?set OLCRTC_CARRIER (jitsi, telemost, wbstream, none)}" OLCRTC_TRANSPORT: "${OLCRTC_TRANSPORT:-datachannel}" OLCRTC_ROOM_ID: "${OLCRTC_ROOM_ID:-}" OLCRTC_KEY: "${OLCRTC_KEY:-}" diff --git a/docs/about.md b/docs/about.md index a67149d..ff980bf 100644 --- a/docs/about.md +++ b/docs/about.md @@ -43,12 +43,12 @@ Классические обходы через VPS ломаются когда VPS не попадает в белый список. Yandex Cloud, VK Cloud, Timeweb в списке - но провайдеры активно банят инстансы используемые как прокси. -**Решение olcRTC**: не пытаться попасть в белый список - использовать сервисы, которые там уже есть навсегда. Телемост, SaluteJazz и WB Stream - сервисы видеозвонков крупных российских компаний. Пока они живы, olcRTC работает. Чтобы их заблокировать - нужно заблокировать сам сервис. +**Решение olcRTC**: не пытаться попасть в белый список - использовать сервисы, которые там уже есть навсегда. Телемост и WB Stream - сервисы видеозвонков крупных российских компаний. Пока они живы, olcRTC работает. Чтобы их заблокировать - нужно заблокировать сам сервис. Трафик идёт через WebRTC SFU этих сервисов: ``` -Клиент (cnc) → SFU Яндекса/Сбера/WB → Сервер (srv, ваш VPS) +Клиент (cnc) → SFU Яндекса/WB → Сервер (srv, ваш VPS) ``` Для ТСПУ это выглядит как обычный видеозвонок. @@ -79,9 +79,9 @@ **2026-04-08..09** - активная Go разработка: клиент-серверная архитектура, кастомный мультиплексор с sequence numbering, имена участников из файла, graceful shutdown, DNS поддержка, Android мост. -**2026-04-10..11** - простой UI, Docker образ сервера, SaluteJazz PoC от community-контрибутора `0xcodepunk`. +**2026-04-10..11** - простой UI, Docker образ сервера. -**2026-04-12..14** - большой рефакторинг: golangci-lint, Jazz провайдер с protobuf-style пакетами, автогенерация Room ID для Jazz, Windows скрипты от `DeNcHiK3713`. +**2026-04-12..14** - большой рефакторинг: golangci-lint, Windows скрипты от `DeNcHiK3713`. **2026-04-19..20** - архитектурный рефакторинг: выделение слоёв `carrier` / `transport` / `link`, WB Stream провайдер через LiveKit SDK, видеоканальный PoC на Python. @@ -95,8 +95,8 @@ **2026-05-11..14** - большой архитектурный рефакторинг `refactor/universal-carrier`: - Разделение `internal/provider/` на `internal/engine/` (wire-level SFU протоколы) + `internal/auth/` (HTTP/API авторизация) -- Три engine: `livekit` (WB Stream), `goolom` (Telemost), `salutejazz` (Jazz) -- Три auth: `wbstream`, `telemost`, `salutejazz` +- Два основных engine: `livekit` (WB Stream), `goolom` (Telemost) +- Auth-провайдеры: `wbstream`, `telemost`, `jitsi` - Замена `-carrier` на `-auth`/`-engine`/`-url`/`-token` - Публичный Go API `pkg/olcrtc` (net.Conn через Session.Dial) для встраивания в sing-box и другие - `cmd/olcrtc-cgo` — C-shared библиотека с Ping API @@ -104,7 +104,6 @@ - Протокол handshake (`internal/handshake/`) с CLIENT_HELLO/SERVER_WELCOME - Session callbacks: OnSessionOpen, OnSessionClose, OnTraffic - Перевод документации на русский -- E2E тесты: jazz non-data транспорты помечены как expected fail ### Статья на Хабре @@ -129,10 +128,10 @@ Transport (datachannel / vp8channel / seichannel / videochannel) │ ▼ - Carrier (jazz / wbstream / telemost) + Carrier (wbstream / telemost / jitsi) │ WebRTC DataChannel или VideoTrack ▼ - SFU Яндекса / Сбера / WB ← сервер в белом списке у всех провайдеров + SFU Яндекса / WB / Jitsi ← сервер в белом списке у всех провайдеров │ ▼ Transport (datachannel / vp8channel / seichannel / videochannel) @@ -148,7 +147,7 @@ Сервер (`srv`) стоит на вашем VPS. Он подключается к той же комнате видеозвонка, получает зашифрованный поток и от своего имени делает TCP соединения к нужным адресам в интернете. -ТСПУ видит трафик к IP Яндекса/Сбера/WB с корректным TLS и SNI - ничем не отличается от обычного видеозвонка. +ТСПУ видит трафик к IP выбранного сервиса с корректным TLS и SNI - ничем не отличается от обычного видеозвонка. --- @@ -185,12 +184,12 @@ internal/carrier/ интерфейс Carrier + реестр internal/engine/ Wire-level SFU протоколы (URL+Token → WebRTC) ├── livekit/ LiveKit (WB Stream) ├── goolom/ Goolom (Yandex Telemost) - └── salutejazz/ SaluteJazz (Сбер) + └── jitsi/ Jitsi Meet │ internal/auth/ HTTP/API авторизация → Credentials для engine ├── wbstream/ WB Stream API (guest register, join, token) ├── telemost/ Yandex Telemost (connection-info) - └── salutejazz/ SaluteJazz (create-meeting, preconnect) + └── jitsi/ Jitsi room URL parsing │ internal/crypto/ ChaCha20-Poly1305 AEAD internal/names/ генератор имён участников @@ -305,7 +304,7 @@ internal/e2e/ E2E тесты на реальных провайдер | `carrier.go` | Интерфейс `Session` + реестр. `Capabilities` описывает что умеет carrier: ByteStream и/или VideoTrack | | `bytestream.go` | `ByteStream` и `VideoTrack` интерфейсы | | `carrier_test.go` | Тесты | -| `builtin/register.go` | Регистрирует jazz, telemost, wbstream, none в реестре carrier через `registerEngineAuth` (связывает auth provider с engine) и `registerDirect` (прямое подключение без auth) | +| `builtin/register.go` | Регистрирует telemost, wbstream, jitsi, none в реестре carrier через `registerEngineAuth` (связывает auth provider с engine) и `registerDirect` (прямое подключение без auth) | | `builtin/engine_adapter.go` | Адаптер `engine.Session` → `carrier.Session`. Связывает auth provider (Issue → Credentials) с engine (Connect с URL+Token). Поддерживает Refresh callback для engines, требующих свежие credentials при реконнекте (Goolom) | ### `internal/engine/` @@ -315,7 +314,7 @@ internal/e2e/ E2E тесты на реальных провайдер | `engine.go` | Интерфейс `Session` (Connect, Send, Close, WatchConnection, CanSend и т.д.) + `Factory` + реестр. `Config` содержит URL, Token, Extra, OnData, DNSServer, Refresh callback. `Capabilities`: ByteStream, VideoTrack | | `livekit/engine.go` | LiveKit engine — используется WB Stream. Подключается через LiveKit SDK, публикует/подписывается на DataChannel и VideoTrack | | `goolom/engine.go` | Goolom engine — проприетарный протокол Яндекса (Telemost). WebSocket signaling, dual pub/sub PeerConnections, DataChannel, telemetry. Использует `Refresh` callback для получения свежих credentials при реконнекте | -| `salutejazz/engine.go` | SaluteJazz engine — протокол Сбера. WebSocket + SDP signaling, pub/sub split, `_reliable` DataChannel, length-prefixed DataPacket envelope | +| `jitsi/engine.go` | Jitsi engine — MUC/Jingle/colibri-ws, byte stream через bridge channel и best-effort VideoTrack | ### `internal/auth/` @@ -324,7 +323,7 @@ internal/e2e/ E2E тесты на реальных провайдер | `auth.go` | Интерфейс `Provider` (Engine, DefaultServiceURL, Issue) + `RoomCreator` + реестр. `Credentials`: URL, Token, Extra | | `wbstream/provider.go` | WB Stream auth: guest register → join room → token exchange. Реализует `RoomCreator`. `Engine()` → `"livekit"`, `DefaultServiceURL()` → `"https://stream.wb.ru"` | | `telemost/provider.go` | Yandex Telemost auth: HTTP connection-info → engine credentials. `Engine()` → `"goolom"`, `DefaultServiceURL()` → `"https://telemost.yandex.ru"` | -| `salutejazz/provider.go` | SaluteJazz auth: create-meeting + preconnect flow. Реализует `RoomCreator`. `Engine()` → `"salutejazz"`. Принимает room в формате `:` | +| `jitsi/provider.go` | Jitsi auth: разбирает URL комнаты и передаёт параметры engine. `Engine()` → `"jitsi"` | ### `internal/crypto/` @@ -383,8 +382,6 @@ internal/e2e/ E2E тесты на реальных провайдер | `telemost_poc_datachannel.py` | Базовый PoC: два гостя в одной Telemost комнате, обмен данными через DataChannel | | `telemost_poc_videochannel.py` | Передача данных QR-кодами в видеопотоке Telemost | | `telemost_info.py` | Сбор полной информации о Telemost конференции: участники, кодеки, ICE серверы, SDP | -| `jazz_poc_datachannel.py` | PoC DataChannel через SaluteJazz | -| `jazz_info.py` | Информация о Jazz конференции | | `wbstream_poc_datachannel.py` | PoC DataChannel через WB Stream | | `wbstream_poc_videochannel.py` | PoC видеоканала через WB Stream | | `wbstream_info.py` | Информация о WB Stream комнате | @@ -426,16 +423,7 @@ internal/e2e/ E2E тесты на реальных провайдер ## 6. Carriers - провайдеры -Carrier - это WebRTC сервис видеозвонков, через который идёт туннель. Все три в белых списках у российских провайдеров. - -### SaluteJazz (`jazz`) - -- Сервис видеозвонков от Сбера: `salutejazz.ru` -- Не требует регистрации для участника (только организатор) -- DataChannel работает, но Jazz **банит IP** за паттерны трафика характерные для DataChannel туннеля -- VideoTrack **не работает** для туннелирования (все non-data транспорты fail в E2E тестах) -- Поддерживает автогенерацию Room ID (`mode: gen`) -- Инициализация звонка изнутри автоматически реализована +Carrier - это WebRTC сервис видеозвонков, через который идёт туннель. ### Yandex Telemost (`telemost`) @@ -467,7 +455,7 @@ Transport определяет как именно данные упаковыв - Лимит payload: 12KB на сообщение (ограничение SFU) - Надёжный, упорядоченный (SCTP гарантирует) -- Работает только с jazz (но Jazz банит IP за паттерны трафика) +- Работает с Jitsi и direct engine-сценариями - Telemost удалил DataChannel - WB Stream DataChannel **не работает** в обычном guest flow — токены выдаются с `canPublishData=false` @@ -476,7 +464,6 @@ Transport определяет как именно данные упаковыв Данные упаковываются в VP8 видеофреймы. Поверх этого строится KCP - надёжный протокол с повторной передачей, работающий поверх ненадёжного канала. - Работает с telemost и wbstream (pass в E2E тестах) -- Jazz не поддерживает VideoTrack для туннелирования (fail) - Большой пинг из-за батчинга фреймов - KCP параметры: MTU 1400, окно 4096, conv ID `0xC0FFEE01` - Рекомендуется: `vp8.fps: 60`, `vp8.batch_size: 64` @@ -489,7 +476,7 @@ Transport определяет как именно данные упаковыв - UUID для SEI payload: `5dc03ba8-450f-4b55-9a77-1f916c5b0739` - ACK timeout (по умолчанию 3с), фрагментация, ретрансмиссия до 4 попыток - Работает только с wbstream (pass в E2E тестах) -- Telemost и Jazz не поддерживают (fail) +- Telemost не поддерживает (fail) - Рекомендуется: `sei.fps: 60`, `sei.batch_size: 64`, `sei.fragment_size: 900`, `sei.ack_timeout_ms: 2000` ### videochannel @@ -500,7 +487,7 @@ Transport определяет как именно данные упаковыв **tile** - тайловый кодек, только 1080x1080. Пиксели кодируют биты напрямую. Reed-Solomon коррекция ошибок. Параметры: размер тайла в пикселях (1..270), процент избыточности (0..200). Быстрее QR но нестабильнее. -Общее: ffmpeg как subprocess, поддержка NVENC, VP8 видеопоток. Самый медленный транспорт. Работает стабильно с wbstream, best effort с telemost, не работает с jazz. +Общее: ffmpeg как subprocess, поддержка NVENC, VP8 видеопоток. Самый медленный транспорт. Работает стабильно с wbstream, best effort с telemost. --- @@ -583,10 +570,6 @@ Community Android клиент: [alananisimov/olcbox](https://github.com/alanani - `telemost_poc_videochannel.py` - QR в видео, `vcsend.py` - передача файлов - `telemost_info.py` - полный дамп SDP, ICE серверов, участников -**Jazz:** -- `jazz_poc_datachannel.py` - DataChannel через Jazz SFU -- `jazz_info.py` - информация о конференции - **WB Stream:** - `wbstream_poc_datachannel.py` - DataChannel - `wbstream_poc_videochannel.py` - видеоканал @@ -713,7 +696,7 @@ olcrtc config.yaml |---|---| | `mode` | `srv` - сервер, `cnc` - клиент, `gen` - генерация Room ID | | `link` | Всегда `direct` | -| `auth.provider` | `telemost`, `jazz`, `wbstream` или `none` | +| `auth.provider` | `telemost`, `wbstream`, `jitsi` или `none` | | `room.id` | Room ID | | `crypto.key` | Ключ шифрования hex 64 символа | | `net.transport` | `datachannel`, `vp8channel`, `seichannel`, `videochannel` | @@ -790,19 +773,17 @@ olcrtc://wbstream?vp8channel@room-01#key$RU / free ## 16. Матрица совместимости -| Transport | telemost | jazz | wbstream | +| Transport | telemost | wbstream | jitsi | |---|:---:|:---:|:---:| -| datachannel | `-` | `+` | `-` | -| vp8channel | `+` | `-` | `+` | -| seichannel | `-` | `-` | `+` | -| videochannel | `~` | `-` | `+` | +| datachannel | `-` | `-` | `+` | +| vp8channel | `+` | `+` | `~` | +| seichannel | `-` | `+` | `~` | +| videochannel | `~` | `+` | `~` | - `+` работает (pass в E2E тестах) - `-` не работает / не поддерживается (fail в E2E тестах) - `~` best effort (может работать, но нестабильно) -**Jazz:** только datachannel проходит E2E тесты. Все non-data транспорты (vp8channel, seichannel, videochannel) помечены как expected fail — Jazz не поддерживает VideoTrack для туннелирования. Кроме того, Jazz **банит IP** за паттерны datachannel трафика. - **Telemost:** только vp8channel стабильно проходит. DataChannel удалён из Telemost. seichannel не поддерживается. videochannel — best effort. **WBStream:** все транспорты кроме datachannel работают. DataChannel помечен как expected fail — в обычном guest flow WB Stream выдаёт токены с `canPublishData=false`, и DC не маршрутизирует данные. Для DC нужны модераторские/permission права. @@ -858,14 +839,6 @@ WB Stream - текущий приоритет. Основа уже реализ - [ ] Авто перезапуск звонка - [ ] TLS стек Chrome -**Issue #1 - реализовать поддержку salutejazz.ru** `enhancement` - -- [ ] Симуляция XHR телеметрии -- [ ] Симуляция задержек -- [ ] Система завершения звонка -- [ ] Авто перезапуск звонка -- [ ] TLS стек Chrome - ### Закрытые (уже сделано) | Issue | Что было | @@ -899,7 +872,6 @@ WB Stream - текущий приоритет. Основа уже реализ | **Kot-nikot** | 3 | Фиксы | | **HLNikNiky** / Sesdear | 2 | URI добавление, фиксы | | **Denis Suchok** / DeNcHiK3713 | 1 | Windows Podman скрипты | -| **0xcodepunk** | 1 | SaluteJazz PoC DataChannel (issue #10) | | **scalebb2** | 1 | - | --- diff --git a/docs/configuration.md b/docs/configuration.md index 07d1713..e4fd98f 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -19,7 +19,7 @@ olcrtc /etc/olcrtc/server.yaml |------------------------------------------------------------------|-----------------------------------------------------------| | `mode` | `srv`, `cnc`, or `gen` | | `link` | `direct` | -| `auth.provider` | `jitsi`, `telemost`, `jazz`, `wbstream`, `none` | +| `auth.provider` | `jitsi`, `telemost`, `wbstream`, `none` | | `room.id` | conference room id | | `crypto.key` / `crypto.key_file` | 64-char hex (32 bytes), inline or read from file | | `net.transport` | `datachannel`, `videochannel`, `seichannel`, `vp8channel` | diff --git a/docs/fast.md b/docs/fast.md index 94de631..a7c7216 100644 --- a/docs/fast.md +++ b/docs/fast.md @@ -96,9 +96,8 @@ cd olcrtc Select auth provider: 1) jitsi 2) telemost - 3) jazz - 4) wbstream -Enter choice [1-4, default: 1]: + 3) wbstream +Enter choice [1-3, default: 1]: ``` Выбери сервис. Полную матрицу совместимости смотри в [settings.md](settings.md). @@ -117,7 +116,7 @@ Enter choice [1-4, default: 1]: ``` Рекомендации: -- **datachannel** - самый быстрый, минимальный пинг. Стабильно работает с `jitsi` через colibri-ws bridge channel. С `jazz` тоже работает, но Jazz банит IP за паттерны трафика. **WBStream DC не работает** в обычном guest flow (токены без `canPublishData`). **Telemost удалил DC**. +- **datachannel** - самый быстрый, минимальный пинг. Стабильно работает с `jitsi` через colibri-ws bridge channel. **WBStream DC не работает** в обычном guest flow (токены без `canPublishData`). **Telemost удалил DC**. - **vp8channel** - работает с telemost и wbstream, быстрый, но большой пинг. - **seichannel** - работает только с wbstream, медленный, но мелкий пинг. - **videochannel** - работает с wbstream (стабильно) и telemost (best effort), самый медленный и большой пинг. @@ -134,8 +133,6 @@ Enter Room ID: Для **telemost** и **wbstream** - создай руму через сайт ([телемост](https://telemost.yandex.ru/), [wbstream](https://stream.wb.ru)) и вставь её ID. -Для **jazz** скрипт предложит выбор: сгенерировать автоматически (рекомендуется) или ввести существующий ID. При автогенерации скрипт запустит `gen` и получит ID до старта сервера. Также можно создать руму через сайт [jazz](https://salutejazz.ru/calls/create). - ### DNS ``` diff --git a/docs/project-map.md b/docs/project-map.md index d0ebd41..a85d1f4 100644 --- a/docs/project-map.md +++ b/docs/project-map.md @@ -65,7 +65,7 @@ Important fields: | YAML | Runtime field | Notes | |---|---|---| | `mode` | `session.Config.Mode` | `srv`, `cnc`, or `gen`. | -| `auth.provider` | `Auth` | `jitsi`, `telemost`, `jazz`, `wbstream`, or `none`. | +| `auth.provider` | `Auth` | `jitsi`, `telemost`, `wbstream`, or `none`. | | `room.id` | `RoomID` | Carrier-specific room reference. | | `crypto.key` / `crypto.key_file` | `KeyHex` | Shared 32-byte key encoded as 64 hex chars. | | `net.transport` | `Transport` | `datachannel`, `vp8channel`, `seichannel`, or `videochannel`. | @@ -187,7 +187,6 @@ The universal-carrier refactor centers on small registries: ```text carrier "wbstream" -> auth/wbstream -> engine/livekit -carrier "jazz" -> auth/salutejazz -> engine/salutejazz carrier "telemost"-> auth/telemost -> engine/goolom carrier "jitsi" -> auth/jitsi -> engine/jitsi carrier "none" -> direct user-supplied engine/url/token @@ -200,7 +199,6 @@ carrier "none" -> direct user-supplied engine/url/token | `jitsi` | `jitsi` | No | Parses host/room from a public or self-hosted Jitsi URL. No HTTP auth. | | `telemost` | `goolom` | No | Calls Telemost room-info flow and returns Goolom credentials. | | `wbstream` | `livekit` | Yes | Registers guest, optionally creates room, joins room, fetches LiveKit token. | -| `jazz` / `salutejazz` | `salutejazz` | Yes | Creates or joins SaluteJazz room and returns room/password tuple. | | `none` | chosen by config | No | Direct engine mode for downstream tools or self-hosted SFUs. | ## Engines @@ -212,7 +210,6 @@ Engines expose the low-level service/SFU protocol. | `livekit` | `internal/engine/livekit` | Yes | Yes | LiveKit SDK room, data packets, local/remote tracks, reconnect with credential refresh. | | `goolom` | `internal/engine/goolom` | Yes | Yes | Yandex Telemost/Goolom signaling, split publisher/subscriber peer connections, telemetry/keepalive. | | `jitsi` | `internal/engine/jitsi` | Yes | Best effort | Jitsi MUC/Jingle/colibri-ws plus optional video track negotiation. | -| `salutejazz` | `internal/engine/salutejazz` | Yes | Yes | SaluteJazz WebSocket signaling and split media peer connections. | Engine work is where most provider breakage and reconnect complexity lives. @@ -223,7 +220,7 @@ either a byte stream or a video track. | Transport | Primitive | Reliability model | Best fit | Notes | |---|---|---|---|---| -| `datachannel` | Carrier byte stream | Native reliable ordered messages | Jitsi, direct engines, some Jazz cases | Simple pass-through with 12 KiB message cap. | +| `datachannel` | Carrier byte stream | Native reliable ordered messages | Jitsi and direct engines | Simple pass-through with 12 KiB message cap. | | `vp8channel` | VP8 video track | KCP over VP8-looking frames | WB Stream and Telemost-style video paths | Highest-performance video-path transport. Uses epochs and binding tokens to survive restarts/loopback. | | `seichannel` | H264 SEI video track | Custom fragments + ACK/retry | WB Stream fallback | Carries data in SEI NAL units with fragmentation, CRC, ACK. | | `videochannel` | Visual frames via ffmpeg | QR/tile frames + ACK/retry | Experimental/inspection-friendly path | Encodes visual payload frames, requires ffmpeg, supports QR and tile codecs. | diff --git a/docs/server.example.yaml b/docs/server.example.yaml index edd5fcb..b57698d 100644 --- a/docs/server.example.yaml +++ b/docs/server.example.yaml @@ -7,10 +7,10 @@ mode: srv link: direct # p2p link type auth: - provider: jitsi # jitsi | telemost | jazz | wbstream | none + provider: jitsi # jitsi | telemost | wbstream | none # For jitsi: full conference URL (https://host/room or host/room). -# For telemost / wbstream / jazz: room ID returned by the service. +# For telemost / wbstream: room ID returned by the service. room: id: "https://meet.small-dm.ru/REPLACE_WITH_ROOM_NAME" @@ -45,7 +45,7 @@ socks: # Direct engine mode — only used when auth.provider is "none" engine: - name: "" # livekit | goolom | salutejazz | jitsi + name: "" # livekit | goolom | jitsi url: "" token: "" diff --git a/docs/settings.md b/docs/settings.md index 2e564af..c855750 100644 --- a/docs/settings.md +++ b/docs/settings.md @@ -12,20 +12,18 @@ ## Матрица совместимости -| Transport | telemost | jazz | wbstream | jitsi | -|-----------|:--------:|:----:|:--------:|:-----:| -| datachannel | - | ~ | ~ | + | -| vp8channel | + | - | + | ~ | -| seichannel | - | - | + | ~ | -| videochannel | + | - | + | ~ | +| Transport | telemost | wbstream | jitsi | +|-----------|:--------:|:--------:|:-----:| +| datachannel | - | ~ | + | +| vp8channel | + | + | ~ | +| seichannel | - | + | ~ | +| videochannel | + | + | ~ | **Легенда:** - `+` - работает (pass в E2E тестах) - `-` - не работает / не поддерживается (fail в E2E тестах) - `~` - нестабильно (может работать, но нестабильно) -**Jazz:** только datachannel проходит E2E тесты. Все non-data транспорты (vp8channel, seichannel, videochannel) не работают — Jazz не поддерживает VideoTrack для туннелирования. Кроме того, Jazz **банит IP** за паттерны datachannel трафика. - **Telemost:** только vp8channel стабильно проходит. DataChannel удалён из Telemost. seichannel не поддерживается. videochannel — best effort. **WBStream:** все транспорты кроме datachannel работают. DataChannel в обычном guest flow без выдавания модератора не работает — WB Stream выдаёт токены с `canPublishData=false`, и DC не маршрутизирует данные. @@ -45,7 +43,7 @@ | YAML поле | Что вводить | |-----------|-------------| | `mode` | `srv` на сервере, `cnc` на клиенте, `gen` для генерации Room ID | -| `auth.provider` | `telemost`, `jazz`, `wbstream` или `jitsi` | +| `auth.provider` | `telemost`, `wbstream` или `jitsi` | | `net.transport` | `datachannel`, `vp8channel`, `seichannel` или `videochannel` | | `room.id` | Room ID | | `crypto.key` или `crypto.key_file` | Ключ шифрования hex 64 символа. Генерация: `openssl rand -hex 32` | @@ -99,13 +97,13 @@ transport. Используй одинаковые traffic-настройки н ## mode: gen -Генерирует Room ID заранее, не запуская сервер. Поддерживается для auth-провайдеров с автосозданием комнат: `jazz` и `wbstream`. Для `telemost` комнату нужно создавать вручную через сайт. +Генерирует Room ID заранее, не запуская сервер. Поддерживается для auth-провайдеров с автосозданием комнат: `wbstream`. Для `telemost` комнату нужно создавать вручную через сайт. **Обязательные поля:** | YAML поле | Описание | |-----------|----------| -| `auth.provider` | `jazz` или `wbstream` | +| `auth.provider` | `wbstream` | | `net.dns` | DNS-сервер | | `gen.amount` | Количество комнат | diff --git a/docs/uri.md b/docs/uri.md index 1463a3d..1fdb50e 100644 --- a/docs/uri.md +++ b/docs/uri.md @@ -33,7 +33,7 @@ olcrtc://?@#$ | Поле | Значение | |------|----------| -| `` | Имя auth-провайдера, например `telemost`, `jazz`, `wbstream`, `jitsi` | +| `` | Имя auth-провайдера, например `telemost`, `wbstream`, `jitsi` | | `` | Имя транспорта, например `datachannel`, `vp8channel`, `seichannel`, `videochannel` | | payload | Параметры транспорта в ``. Ключи совпадают с YAML полями. Блок опускается если используются defaults | | `` | Идентификатор комнаты или auth-specific room URL/ID | @@ -162,10 +162,10 @@ vp8: data: data ``` -### jazz + seichannel +### wbstream + seichannel ```text -olcrtc://jazz?seichannel@room-01#d823fa01cb3e0609b67322f7cf984c4ee2e4ce2e294936fc24ef38c9e59f4799$DE / olc free sub +olcrtc://wbstream?seichannel@room-01#d823fa01cb3e0609b67322f7cf984c4ee2e4ce2e294936fc24ef38c9e59f4799$DE / olc free sub ``` ### Эквивалент YAML @@ -174,7 +174,7 @@ olcrtc://jazz?seichannel@room-01#d823fa01c mode: cnc link: direct auth: - provider: jazz + provider: wbstream room: id: "room-01" crypto: diff --git a/internal/app/session/session.go b/internal/app/session/session.go index 6610463..759f5aa 100644 --- a/internal/app/session/session.go +++ b/internal/app/session/session.go @@ -29,7 +29,6 @@ const ( modeSRV = "srv" modeCNC = "cnc" modeGen = "gen" - authJazz = "jazz" authNone = "none" transportVideo = "videochannel" transportVP8 = "vp8channel" @@ -64,7 +63,7 @@ var ( ErrAmountRequired = errors.New("amount required for gen mode (set gen.amount)") // ErrAuthRequired indicates that no auth provider was selected. ErrAuthRequired = errors.New( - "auth provider required (set auth.provider to jitsi, telemost, jazz, wbstream or none)") + "auth provider required (set auth.provider to jitsi, telemost, wbstream or none)") // ErrURLRequired indicates that auth.url must be provided when the auth provider has no default URL. ErrURLRequired = errors.New("SFU URL required (set auth.url)") // ErrUnsupportedCarrier indicates that carrier is not registered. @@ -380,7 +379,7 @@ func validateTransportRegistration(cfg Config) error { } func validateCommon(cfg Config) error { - if cfg.RoomID == "" && cfg.Auth != authJazz && cfg.Auth != authNone { + if cfg.RoomID == "" && cfg.Auth != authNone { return ErrRoomIDRequired } if cfg.KeyHex == "" { diff --git a/internal/app/session/session_test.go b/internal/app/session/session_test.go index ca6f38d..a3aa21b 100644 --- a/internal/app/session/session_test.go +++ b/internal/app/session/session_test.go @@ -139,15 +139,6 @@ func TestValidate(t *testing.T) { want error }{ {name: "valid baseline", cfg: base}, - { - name: "jazz allows empty room id", - cfg: func() Config { - cfg := base - cfg.Auth = "jazz" - cfg.RoomID = "" - return cfg - }(), - }, { name: "cnc requires socks host and port", cfg: func() Config { @@ -186,7 +177,7 @@ func TestValidate(t *testing.T) { want: ErrUnsupportedTransport, }, { - name: "room id required for non jazz", + name: "room id required", cfg: func() Config { cfg := base cfg.RoomID = "" @@ -588,10 +579,6 @@ func TestValidateGen(t *testing.T) { name: "valid wbstream", cfg: Config{Auth: testAuthWBStream, DNSServer: "1.1.1.1:53", Amount: 3}, }, - { - name: "valid jazz", - cfg: Config{Auth: "jazz", DNSServer: "1.1.1.1:53", Amount: 1}, - }, { name: "missing auth", cfg: Config{DNSServer: "1.1.1.1:53", Amount: 1}, diff --git a/internal/auth/auth.go b/internal/auth/auth.go index 19613a2..e345f8c 100644 --- a/internal/auth/auth.go +++ b/internal/auth/auth.go @@ -1,7 +1,7 @@ // Package auth defines how room credentials are produced for an engine. // // An auth provider is responsible for any service-specific HTTP / login flow -// (WB Stream, SaluteJazz, Yandex Telemost, Jitsi, ...) and produces a +// (WB Stream, Yandex Telemost, Jitsi, ...) and produces a // Credentials value that an engine can use to connect. Some auth providers // also support creating new rooms; that capability is optional and is // expressed via the RoomCreator interface. diff --git a/internal/auth/salutejazz/api.go b/internal/auth/salutejazz/api.go deleted file mode 100644 index 1137b06..0000000 --- a/internal/auth/salutejazz/api.go +++ /dev/null @@ -1,198 +0,0 @@ -// Package salutejazz is the auth provider for the SaluteJazz service. It -// creates / joins a Jazz room over HTTP and returns the connector -// WebSocket URL, room ID and password that the salutejazz engine consumes. -package salutejazz - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "net/http" - - "github.com/google/uuid" - "github.com/openlibrecommunity/olcrtc/internal/protect" -) - -const ( - authTypeAnonymous = "ANONYMOUS" - headerAccept = "Accept" - headerAuthType = "X-Jazz-AuthType" - headerClientID = "X-Jazz-ClientId" - headerClientType = "X-Client-AuthType" - headerContentType = "Content-Type" - headerJazzUA = "X-Jazz-Ua" - headerOrigin = "Origin" - headerReferer = "Referer" - contentTypeJSON = "application/json" - jazzOrigin = "https://salutejazz.ru" - jazzReferer = jazzOrigin + "/" - jazzUA = "osName=Linux;osVersion=;appName=jazz;appVersion=26.21.7;" + - "surface=WEB;browserName=Firefox;browserVersion=150.0" -) - -var apiBase = "https://bk.salutejazz.ru" //nolint:gochecknoglobals // package-level state intentional - -// roomInfo contains connection details for a SaluteJazz room. -type roomInfo struct { - RoomID string - Password string - ConnectorURL string -} - -var ( - errCreateRoomFailed = errors.New("create room failed") - errPreconnectFailed = errors.New("preconnect failed") -) - -func anonymousHeaders() map[string]string { - return map[string]string{ - headerAccept: "application/json, text/plain, */*", - headerAuthType: authTypeAnonymous, - headerClientID: uuid.New().String(), - headerClientType: authTypeAnonymous, - headerContentType: contentTypeJSON, - headerJazzUA: jazzUA, - headerOrigin: jazzOrigin, - headerReferer: jazzReferer, - } -} - -func createRoom(ctx context.Context) (*roomInfo, error) { - headers := anonymousHeaders() - - createResp, err := createMeeting(ctx, headers) - if err != nil { - return nil, fmt.Errorf("create meeting: %w", err) - } - - connectorURL, err := preconnect(ctx, createResp.RoomID, createResp.Password, headers) - if err != nil { - return nil, fmt.Errorf("preconnect: %w", err) - } - - return &roomInfo{ - RoomID: createResp.RoomID, - Password: createResp.Password, - ConnectorURL: connectorURL, - }, nil -} - -type createResponse struct { - RoomID string `json:"roomId"` - Password string `json:"password"` -} - -func createMeeting(ctx context.Context, headers map[string]string) (*createResponse, error) { - createPayload := map[string]any{ - "title": "Video meeting", - "guestEnabled": true, - "lobbyEnabled": false, - "serverVideoRecordAutoStartEnabled": false, - "sipEnabled": false, - "moderatorEmails": []string{}, - "summarizationEnabled": false, - "room3dEnabled": false, - "room3dScene": "XRLobby", - } - - body, err := json.Marshal(createPayload) - if err != nil { - return nil, fmt.Errorf("marshal create payload: %w", err) - } - - req, err := http.NewRequestWithContext(ctx, http.MethodPost, apiBase+"/room/create-meeting", - bytes.NewReader(body)) - if err != nil { - return nil, fmt.Errorf("create request: %w", err) - } - - for k, v := range headers { - req.Header.Set(k, v) - } - - client := protect.NewHTTPClient() - resp, err := client.Do(req) - if err != nil { - return nil, fmt.Errorf("do create request: %w", err) - } - defer func() { _ = resp.Body.Close() }() - - if resp.StatusCode != http.StatusOK { - return nil, fmt.Errorf("create room status: %w", protect.StatusError(errCreateRoomFailed, resp, 1024)) - } - - var res createResponse - if err := json.NewDecoder(resp.Body).Decode(&res); err != nil { - return nil, fmt.Errorf("decode create response: %w", err) - } - return &res, nil -} - -func preconnect(ctx context.Context, roomID, password string, headers map[string]string) (string, error) { - preconnectPayload := map[string]any{ - "password": password, - "jazzNextMigration": map[string]any{ - "b2bBaseRoomSupport": true, - "demoRoomBaseSupport": true, - "demoRoomVersionSupport": 2, - "mediaWithoutAutoSubscribeSupport": true, - "webinarSpeakerSupport": true, - "webinarViewerSupport": true, - "sdkRoomSupport": true, - "sberclassRoomSupport": true, - }, - } - - preBody, err := json.Marshal(preconnectPayload) - if err != nil { - return "", fmt.Errorf("marshal preconnect payload: %w", err) - } - - preReq, err := http.NewRequestWithContext( - ctx, - http.MethodPost, - fmt.Sprintf("%s/room/%s/preconnect", apiBase, roomID), - bytes.NewReader(preBody), - ) - if err != nil { - return "", fmt.Errorf("create preconnect request: %w", err) - } - - for k, v := range headers { - preReq.Header.Set(k, v) - } - - client := protect.NewHTTPClient() - preResp, err := client.Do(preReq) - if err != nil { - return "", fmt.Errorf("do preconnect request: %w", err) - } - defer func() { _ = preResp.Body.Close() }() - - if preResp.StatusCode != http.StatusOK { - return "", fmt.Errorf("preconnect status: %w", protect.StatusError(errPreconnectFailed, preResp, 1024)) - } - - var preconnectResp struct { - ConnectorURL string `json:"connectorUrl"` - } - if err := json.NewDecoder(preResp.Body).Decode(&preconnectResp); err != nil { - return "", fmt.Errorf("decode preconnect response: %w", err) - } - return preconnectResp.ConnectorURL, nil -} - -func joinRoom(ctx context.Context, roomID, password string) (*roomInfo, error) { - headers := anonymousHeaders() - connectorURL, err := preconnect(ctx, roomID, password, headers) - if err != nil { - return nil, err - } - return &roomInfo{ - RoomID: roomID, - Password: password, - ConnectorURL: connectorURL, - }, nil -} diff --git a/internal/auth/salutejazz/api_test.go b/internal/auth/salutejazz/api_test.go deleted file mode 100644 index f019a6f..0000000 --- a/internal/auth/salutejazz/api_test.go +++ /dev/null @@ -1,143 +0,0 @@ -package salutejazz - -import ( - "context" - "encoding/json" - "errors" - "net/http" - "net/http/httptest" - "testing" - - "github.com/openlibrecommunity/olcrtc/internal/auth" -) - -func withJazzAPIServer(t *testing.T, h http.Handler) { - t.Helper() - old := apiBase - srv := httptest.NewServer(h) - t.Cleanup(func() { - apiBase = old - srv.Close() - }) - apiBase = srv.URL -} - -func TestCreateMeetingAndPreconnect(t *testing.T) { - mux := http.NewServeMux() - mux.HandleFunc("POST /room/create-meeting", func(w http.ResponseWriter, r *http.Request) { - if r.Header.Get("X-Jazz-Authtype") != authTypeAnonymous { - t.Fatalf("missing auth header: %v", r.Header) - } - _ = json.NewEncoder(w).Encode(createResponse{RoomID: "room-1", Password: "pass"}) //nolint:gosec - }) - mux.HandleFunc("POST /room/room-1/preconnect", func(w http.ResponseWriter, _ *http.Request) { - _ = json.NewEncoder(w).Encode(map[string]string{connectorURLKey: testConnector}) - }) - - withJazzAPIServer(t, mux) - - headers := map[string]string{ - headerAuthType: authTypeAnonymous, - "Content-Type": "application/json", - } - created, err := createMeeting(context.Background(), headers) - if err != nil { - t.Fatalf("createMeeting() error = %v", err) - } - if created.RoomID != "room-1" || created.Password != "pass" { - t.Fatalf("createMeeting() = %+v", created) - } - - connector, err := preconnect(context.Background(), "room-1", "pass", headers) - if err != nil { - t.Fatalf("preconnect() error = %v", err) - } - if connector != testConnector { - t.Fatalf("preconnect() = %q", connector) - } -} - -const ( - testRoomID = "new-room" - testPassword = "new-pass" - testConnector = "wss://connector" - connectorURLKey = "connectorUrl" -) - -func TestCreateRoomAndJoinRoom(t *testing.T) { - mux := http.NewServeMux() - mux.HandleFunc("POST /room/create-meeting", func(w http.ResponseWriter, _ *http.Request) { - _ = json.NewEncoder(w).Encode(createResponse{RoomID: testRoomID, Password: testPassword}) //nolint:gosec - }) - mux.HandleFunc("POST /room/{id}/preconnect", func(w http.ResponseWriter, _ *http.Request) { - _ = json.NewEncoder(w).Encode(map[string]string{connectorURLKey: testConnector}) - }) - - withJazzAPIServer(t, mux) - - room, err := createRoom(context.Background()) - if err != nil { - t.Fatalf("createRoom() error = %v", err) - } - if room.RoomID != testRoomID || room.Password != testPassword || - room.ConnectorURL != testConnector { - t.Fatalf("createRoom() = %+v", room) - } - - room, err = joinRoom(context.Background(), "existing", "secret") - if err != nil { - t.Fatalf("joinRoom() error = %v", err) - } - if room.RoomID != "existing" || room.Password != "secret" || room.ConnectorURL != testConnector { - t.Fatalf("joinRoom() = %+v", room) - } -} - -func TestJazzAPIErrors(t *testing.T) { - mux := http.NewServeMux() - mux.HandleFunc("/room/create-meeting", func(w http.ResponseWriter, _ *http.Request) { - http.Error(w, "bad", http.StatusTeapot) - }) - mux.HandleFunc("/", func(w http.ResponseWriter, _ *http.Request) { - http.Error(w, "bad", http.StatusInternalServerError) - }) - - withJazzAPIServer(t, mux) - - if _, err := createMeeting(context.Background(), nil); !errors.Is(err, errCreateRoomFailed) { - t.Fatalf("createMeeting() error = %v, want %v", err, errCreateRoomFailed) - } - if _, err := preconnect(context.Background(), "room", "pass", nil); !errors.Is(err, errPreconnectFailed) { - t.Fatalf("preconnect() error = %v, want %v", err, errPreconnectFailed) - } -} - -func TestJazzIssue(t *testing.T) { - mux := http.NewServeMux() - mux.HandleFunc("POST /room/create-meeting", func(w http.ResponseWriter, _ *http.Request) { - _ = json.NewEncoder(w).Encode(createResponse{RoomID: testRoomID, Password: testPassword}) //nolint:gosec - }) - mux.HandleFunc("POST /room/{id}/preconnect", func(w http.ResponseWriter, _ *http.Request) { - _ = json.NewEncoder(w).Encode(map[string]string{connectorURLKey: testConnector}) - }) - - withJazzAPIServer(t, mux) - - p := Provider{} - creds, err := p.Issue(context.Background(), auth.Config{ - RoomURL: "any", - Name: "peer", - }) - if err != nil { - t.Fatalf("Issue() error = %v", err) - } - if creds.URL != testConnector { - t.Fatalf("creds.URL = %q", creds.URL) - } - if creds.Token != testRoomID { - t.Fatalf("creds.Token = %q", creds.Token) - } - if creds.Extra["password"] != testPassword { - t.Fatalf("creds.Extra[password] = %q", creds.Extra["password"]) - } -} diff --git a/internal/auth/salutejazz/salutejazz.go b/internal/auth/salutejazz/salutejazz.go deleted file mode 100644 index 6dd6abf..0000000 --- a/internal/auth/salutejazz/salutejazz.go +++ /dev/null @@ -1,70 +0,0 @@ -package salutejazz - -import ( - "context" - "fmt" - "strings" - - "github.com/openlibrecommunity/olcrtc/internal/auth" -) - -// Provider produces SaluteJazz credentials. -type Provider struct{} - -// Engine reports which engine consumes credentials from this auth provider. -func (Provider) Engine() string { return "salutejazz" } - -// DefaultServiceURL returns the SaluteJazz service URL. -func (Provider) DefaultServiceURL() string { return "https://bk.salutejazz.ru" } - -// Issue runs the SaluteJazz API flow and returns engine credentials. -// -// cfg.RoomURL accepts either an empty value (a new room is created on the -// fly, mirroring the legacy jazz provider) or ":". -func (Provider) Issue(ctx context.Context, cfg auth.Config) (auth.Credentials, error) { - roomRef := strings.TrimSpace(cfg.RoomURL) - var info *roomInfo - var err error - - switch roomRef { - case "", "any", "dummy": - info, err = createRoom(ctx) - if err != nil { - return auth.Credentials{}, fmt.Errorf("create room: %w", err) - } - default: - roomID, password, hasPassword := strings.Cut(roomRef, ":") - if !hasPassword { - return auth.Credentials{}, fmt.Errorf("%w: expected :", auth.ErrRoomIDRequired) - } - info, err = joinRoom(ctx, roomID, password) - if err != nil { - return auth.Credentials{}, fmt.Errorf("join room: %w", err) - } - } - - return auth.Credentials{ - URL: info.ConnectorURL, - Token: info.RoomID, - Extra: map[string]string{ - "password": info.Password, - "roomID": info.RoomID, - }, - }, nil -} - -// CreateRoom creates a new SaluteJazz room and returns ":". -// -// Returned format mirrors the legacy gen-mode output so existing -// subscriptions and tooling keep working. -func (Provider) CreateRoom(ctx context.Context, _ auth.Config) (string, error) { - info, err := createRoom(ctx) - if err != nil { - return "", fmt.Errorf("create room: %w", err) - } - return info.RoomID + ":" + info.Password, nil -} - -func init() { //nolint:gochecknoinits // auth registration is the canonical Go pattern for plugins - auth.Register("salutejazz", Provider{}) -} diff --git a/internal/config/config.go b/internal/config/config.go index 8df7058..2b3171f 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -79,7 +79,7 @@ type Failover struct { // Auth selects the auth provider. type Auth struct { - Provider string `yaml:"provider"` // telemost, jazz, wbstream, none + Provider string `yaml:"provider"` // telemost, wbstream, none } // Room identifies the conference room. @@ -112,7 +112,7 @@ type SOCKS struct { // Engine selects a direct SFU connection when Auth.Provider is "none". type Engine struct { - Name string `yaml:"name"` // livekit, goolom, salutejazz + Name string `yaml:"name"` // livekit, goolom, jitsi URL string `yaml:"url"` Token string `yaml:"token"` } diff --git a/internal/e2e/tunnel_test.go b/internal/e2e/tunnel_test.go index 24d82a0..dc060ce 100644 --- a/internal/e2e/tunnel_test.go +++ b/internal/e2e/tunnel_test.go @@ -21,7 +21,6 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/app/session" "github.com/openlibrecommunity/olcrtc/internal/auth" - authSaluteJazz "github.com/openlibrecommunity/olcrtc/internal/auth/salutejazz" authWBStream "github.com/openlibrecommunity/olcrtc/internal/auth/wbstream" "github.com/openlibrecommunity/olcrtc/internal/client" "github.com/openlibrecommunity/olcrtc/internal/engine" @@ -74,11 +73,6 @@ var ( "datachannel,videochannel,seichannel,vp8channel", "comma-separated transports for real e2e", ) - realE2EJazzRoom = flag.String( //nolint:gochecknoglobals // package-level state intentional - "olcrtc.real-jazz-room", - "", - "SaluteJazz room for real e2e, format roomID:password; autogenerated when empty", - ) realE2ETelemostRoom = flag.String( //nolint:gochecknoglobals // package-level state intentional "olcrtc.real-telemost-room", "41514917109506", @@ -368,7 +362,7 @@ func registerFailingCarrier(t *testing.T) string { } func builtInCarrierNames() []string { - return []string{"jazz", "telemost", "wbstream", "jitsi"} //nolint:goconst // test literal, repetition is intentional + return []string{"telemost", "wbstream", "jitsi"} //nolint:goconst // test literal, repetition is intentional } func builtInTransportNames() []string { @@ -392,11 +386,6 @@ func realE2ECaseExpectation(carrierName, transportName string) realE2EExpectatio return realE2EExpectFail } return realE2EExpectPass - case "jazz": - if transportName == transportData { - return realE2EExpectPass - } - return realE2EExpectFail case "jitsi": // Jitsi colibri-ws bridge channel maps cleanly onto the // datachannel transport (raw bytes broadcast through @@ -453,30 +442,6 @@ func TestRealE2ECaseExpectation(t *testing.T) { transport string want realE2EExpectation }{ - { - name: "jazz datachannel is expected to pass", - carrier: "jazz", - transport: transportData, - want: realE2EExpectPass, - }, - { - name: "jazz videochannel is expected to fail", - carrier: "jazz", - transport: transportVideo, - want: realE2EExpectFail, - }, - { - name: "jazz seichannel is expected to fail", - carrier: "jazz", - transport: transportSEI, - want: realE2EExpectFail, - }, - { - name: "jazz vp8channel is expected to fail", - carrier: "jazz", - transport: transportVP8, - want: realE2EExpectFail, - }, { name: "telemost datachannel is expected to fail", carrier: "telemost", @@ -547,15 +512,6 @@ func realRoomURL(ctx context.Context, t *testing.T, carrierName string) string { t.Helper() switch carrierName { - case "jazz": - if *realE2EJazzRoom != "" { - return *realE2EJazzRoom - } - room, err := authSaluteJazz.Provider{}.CreateRoom(ctx, auth.Config{Name: "olcrtc-e2e-room"}) - if err != nil { - t.Skipf("skip jazz real e2e: create room failed: %v", err) - } - return room case "telemost": room := *realE2ETelemostRoom if room != "" && !strings.HasPrefix(room, "http://") && !strings.HasPrefix(room, "https://") { diff --git a/internal/engine/builtin/builtin.go b/internal/engine/builtin/builtin.go index 29d3b15..da52506 100644 --- a/internal/engine/builtin/builtin.go +++ b/internal/engine/builtin/builtin.go @@ -13,14 +13,12 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/auth" authJitsi "github.com/openlibrecommunity/olcrtc/internal/auth/jitsi" - authSaluteJazz "github.com/openlibrecommunity/olcrtc/internal/auth/salutejazz" authTelemost "github.com/openlibrecommunity/olcrtc/internal/auth/telemost" authWBStream "github.com/openlibrecommunity/olcrtc/internal/auth/wbstream" "github.com/openlibrecommunity/olcrtc/internal/engine" - _ "github.com/openlibrecommunity/olcrtc/internal/engine/goolom" // register goolom engine via init - _ "github.com/openlibrecommunity/olcrtc/internal/engine/jitsi" // register jitsi engine via init - _ "github.com/openlibrecommunity/olcrtc/internal/engine/livekit" // register livekit engine via init - _ "github.com/openlibrecommunity/olcrtc/internal/engine/salutejazz" // register salutejazz engine via init + _ "github.com/openlibrecommunity/olcrtc/internal/engine/goolom" // register goolom engine via init + _ "github.com/openlibrecommunity/olcrtc/internal/engine/jitsi" // register jitsi engine via init + _ "github.com/openlibrecommunity/olcrtc/internal/engine/livekit" // register livekit engine via init ) // ErrCarrierNotFound is returned when an unregistered carrier name is requested. @@ -75,11 +73,10 @@ func Available() []string { return names } -// RegisterDefaults wires the built-in carriers: jitsi, telemost, jazz, wbstream +// RegisterDefaults wires the built-in carriers: jitsi, telemost, wbstream // and "none" (direct engine access). func RegisterDefaults() { registerEngineAuth("wbstream", authWBStream.Provider{}) - registerEngineAuth("jazz", authSaluteJazz.Provider{}) registerEngineAuth("telemost", authTelemost.Provider{}) registerEngineAuth("jitsi", authJitsi.Provider{}) registerDirect("none") diff --git a/internal/engine/engine.go b/internal/engine/engine.go index adb077a..67b9dc8 100644 --- a/internal/engine/engine.go +++ b/internal/engine/engine.go @@ -4,7 +4,7 @@ // byte/video primitives the rest of olcrtc consumes. // // Engines model the SFU protocol family (e.g. LiveKit, Goolom). Service- -// specific bits (e.g. WB / Jazz / Telemost API flows) live in the auth +// specific bits (e.g. WB / Telemost API flows) live in the auth // package, not here. package engine @@ -41,7 +41,7 @@ type Credentials struct { // Config is the runtime input to an engine factory. URL/Token are produced by // an auth provider (or supplied directly by the caller for "none" auth). // Extra carries engine-specific fields that don't fit the common shape -// (e.g. SaluteJazz needs a separate room password alongside the room ID). +// (e.g. providers that need metadata beyond URL/token can pass it here). // // Refresh, when set, is called by an engine whose protocol requires fresh // credentials on each reconnect (e.g. Goolom: every reconnect needs a new diff --git a/internal/engine/jitsi/jitsi.go b/internal/engine/jitsi/jitsi.go index af9df11..c19cfe6 100644 --- a/internal/engine/jitsi/jitsi.go +++ b/internal/engine/jitsi/jitsi.go @@ -12,7 +12,7 @@ // // The Jingle session-initiate is only delivered by Jicofo once at least one // other participant is present in the conference, mirroring the Telemost / -// SaluteJazz two-peer requirement that olcrtc already accommodates. +// two-peer tunnel model that olcrtc already accommodates. package jitsi import ( diff --git a/internal/engine/livekit/livekit.go b/internal/engine/livekit/livekit.go index 80b4aab..552a7e9 100644 --- a/internal/engine/livekit/livekit.go +++ b/internal/engine/livekit/livekit.go @@ -3,7 +3,7 @@ // // This engine is service-agnostic: it accepts a wss:// signaling URL and an // access token, and provides byte-stream + video-track primitives over a -// LiveKit room. Service-specific token acquisition (e.g. WB Stream, Jazz, +// LiveKit room. Service-specific token acquisition (e.g. WB Stream, // or a self-hosted LiveKit deployment) lives in the auth package. package livekit diff --git a/internal/engine/salutejazz/close_test.go b/internal/engine/salutejazz/close_test.go deleted file mode 100644 index 5d57182..0000000 --- a/internal/engine/salutejazz/close_test.go +++ /dev/null @@ -1,164 +0,0 @@ -package salutejazz - -import ( - "context" - "net/http" - "net/http/httptest" - "sync" - "testing" - "time" - - "github.com/gorilla/websocket" -) - -// TestCloseUnblocksHandleSignaling pins down the shutdown ordering: when a -// peer goroutine is parked in handleSignaling -> ws.ReadJSON, calling Close -// must close the WebSocket up front so ReadJSON returns immediately and the -// signaling loop exits within the closeWaitTimeout. The historical bug had -// Close call wg.Wait() BEFORE closing the WS, so handleSignaling stayed -// parked for the full timeout (and on flaky networks longer once pion's -// PeerConnection.Close kicked in too) — which on CI showed up as -// "tunnel goroutine did not stop: client" in the real e2e jazz matrix. -// -//nolint:cyclop // setup + handler + assertions naturally produces several branches in one test -func TestCloseUnblocksHandleSignaling(t *testing.T) { - upgrader := websocket.Upgrader{CheckOrigin: func(*http.Request) bool { return true }} - - // Server side parks on a read so it never closes the connection - // from its end, forcing the client-side ReadJSON to depend on - // shutdownWebSocket flipping the read deadline / closing the conn. - serverDone := make(chan struct{}) - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - t.Errorf("upgrade websocket: %v", err) - return - } - defer func() { - _ = conn.Close() - close(serverDone) - }() - _, _, _ = conn.ReadMessage() - })) - defer srv.Close() - - wsURL := "ws" + srv.URL[len("http"):] - dialer := websocket.Dialer{HandshakeTimeout: 2 * time.Second} - conn, resp, err := dialer.Dial(wsURL, nil) - if resp != nil && resp.Body != nil { - _ = resp.Body.Close() - } - if err != nil { - t.Fatalf("dial websocket: %v", err) - } - - s := &Session{ - ws: conn, - reconnectCh: make(chan struct{}, 1), - closeCh: make(chan struct{}), - sessionCloseCh: make(chan struct{}), - sendQueue: make(chan []byte, 1), - subscriberConn: make(chan struct{}), - publisherConn: make(chan struct{}), - videoNegotiated: make(chan struct{}), - } - - // Mirror Connect's bookkeeping for the signaling goroutine so - // wg.Wait blocks on it during Close. - signalingDone := make(chan struct{}) - s.wg.Add(1) - go func() { - defer s.wg.Done() - defer close(signalingDone) - s.handleSignaling(context.Background()) - }() - - start := time.Now() - if err := s.Close(); err != nil { - t.Fatalf("Close() error = %v", err) - } - elapsed := time.Since(start) - - // closeWaitTimeout is 2s; with the fix Close should return well under that - // because shutdownWebSocket trips ReadJSON's deadline up front. Allow some - // slack so this remains stable on slow CI runners but still fail loudly - // if the historical 2s wait creeps back in. - if elapsed > closeWaitTimeout-500*time.Millisecond { - t.Fatalf("Close() took %s, expected < %s; handleSignaling likely parked", elapsed, closeWaitTimeout) - } - - select { - case <-signalingDone: - case <-time.After(time.Second): - t.Fatal("handleSignaling did not exit after Close") - } - - // Drain the server side too so the test doesn't leak goroutines. - select { - case <-serverDone: - case <-time.After(time.Second): - } -} - -// TestShutdownWebSocketIsIdempotent guards the contract that Close can be -// called more than once (e.g. by both the carrier teardown path and a -// defer in tests) without panicking. gorilla/websocket's Close returns -// ErrCloseSent on the second call which we tolerate. -func TestShutdownWebSocketIsIdempotent(t *testing.T) { - upgrader := websocket.Upgrader{CheckOrigin: func(*http.Request) bool { return true }} - srv := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - t.Errorf("upgrade websocket: %v", err) - return - } - defer func() { _ = conn.Close() }() - _, _, _ = conn.ReadMessage() - })) - defer srv.Close() - - wsURL := "ws" + srv.URL[len("http"):] - conn, resp, err := websocket.DefaultDialer.Dial(wsURL, nil) - if resp != nil && resp.Body != nil { - _ = resp.Body.Close() - } - if err != nil { - t.Fatalf("dial websocket: %v", err) - } - - s := &Session{ws: conn} - - var wg sync.WaitGroup - wg.Add(2) - go func() { defer wg.Done(); s.shutdownWebSocket() }() - go func() { defer wg.Done(); s.shutdownWebSocket() }() - wg.Wait() -} - -// TestCloseWithDeadlineDoesNotBlockOnStraggler pins down that a wedged -// PeerConnection.Close (modeled here as a never-returning closer) does not -// hold up Session.Close past its budget. The historical failure mode showed -// up in the real e2e matrix as "tunnel goroutine did not stop: client" when -// pion's TURN refresh storm kept the ICE agent alive long after the test -// asked it to shut down. -func TestCloseWithDeadlineDoesNotBlockOnStraggler(t *testing.T) { - deadline := 50 * time.Millisecond - block := make(chan struct{}) - t.Cleanup(func() { close(block) }) - closers := []func() error{ - func() error { return nil }, - func() error { <-block; return nil }, - } - - start := time.Now() - closeWithDeadline(closers, deadline) - elapsed := time.Since(start) - - if elapsed > deadline*4 { - t.Fatalf("closeWithDeadline blocked for %s, expected ~%s", elapsed, deadline) - } - if elapsed < deadline { - t.Fatalf("closeWithDeadline returned in %s before deadline %s; straggler ignored", - elapsed, deadline) - } -} diff --git a/internal/engine/salutejazz/datapacket.go b/internal/engine/salutejazz/datapacket.go deleted file mode 100644 index c833b41..0000000 --- a/internal/engine/salutejazz/datapacket.go +++ /dev/null @@ -1,144 +0,0 @@ -package salutejazz - -import ( - "encoding/binary" - "fmt" - "io" - - "github.com/google/uuid" -) - -func encodeVarint(value uint64) []byte { - buf := make([]byte, binary.MaxVarintLen64) - n := binary.PutUvarint(buf, value) - return buf[:n] -} - -func encodeField(fieldNumber int, wireType int, data []byte) []byte { - tag := encodeVarint(uint64(fieldNumber)<<3 | uint64(wireType)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic - switch wireType { - case 2: - length := encodeVarint(uint64(len(data))) - result := make([]byte, 0, len(tag)+len(length)+len(data)) - result = append(result, tag...) - result = append(result, length...) - result = append(result, data...) - return result - default: - result := make([]byte, 0, len(tag)+len(data)) - result = append(result, tag...) - result = append(result, data...) - return result - } -} - -// EncodeDataPacket wraps a payload into a SaluteJazz data packet. -func EncodeDataPacket(payload []byte) []byte { - msgID := uuid.New().String() - - userFields := encodeField(2, 2, payload) - userFields = append(userFields, encodeField(8, 2, []byte(msgID))...) - - dp := encodeField(1, 0, encodeVarint(0)) - dp = append(dp, encodeField(2, 2, userFields)...) - - return dp -} - -func readVarint(r io.ByteReader) (uint64, error) { - val, err := binary.ReadUvarint(r) - if err != nil { - return 0, fmt.Errorf("read uvarint: %w", err) - } - return val, nil -} - -// DecodeDataPacket extracts the payload from a SaluteJazz data packet. -func DecodeDataPacket(raw []byte) ([]byte, bool) { - userData, ok := parseFields(raw, 2) - if !ok { - return nil, false - } - - payload, ok := parseFields(userData, 2) - return payload, ok -} - -func parseFields(data []byte, targetField int) ([]byte, bool) { - reader := &byteReader{data: data, pos: 0} - var result []byte - - for reader.pos < len(reader.data) { - tagVal, err := readVarint(reader) - if err != nil { - break - } - - fieldNumber := int(tagVal >> 3) - wireType := int(tagVal & 0x07) - - fieldData, ok := handleWireType(reader, wireType, len(data)) - if !ok { - return result, len(result) > 0 - } - - if fieldNumber == targetField && wireType == 2 { - result = fieldData - } - } - - return result, len(result) > 0 -} - -func handleWireType(reader *byteReader, wireType int, dataLen int) ([]byte, bool) { - switch wireType { - case 0: - _, _ = readVarint(reader) - return nil, true - case 2: - length, err := readVarint(reader) - if err != nil { - return nil, false - } - if length > uint64(dataLen)-uint64(reader.pos) { //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic - return nil, false - } - fieldData := make([]byte, length) - n, err := reader.Read(fieldData) - if err != nil || uint64(n) != length { //nolint:gosec // G115: bounded conversion verified by surrounding logic - return nil, false - } - return fieldData, true - case 1: - reader.pos += 8 - return nil, true - case 5: - reader.pos += 4 - return nil, true - default: - return nil, false - } -} - -type byteReader struct { - data []byte - pos int -} - -func (b *byteReader) ReadByte() (byte, error) { - if b.pos >= len(b.data) { - return 0, io.EOF - } - c := b.data[b.pos] - b.pos++ - return c, nil -} - -func (b *byteReader) Read(p []byte) (int, error) { - if b.pos >= len(b.data) { - return 0, io.EOF - } - n := copy(p, b.data[b.pos:]) - b.pos += n - return n, nil -} diff --git a/internal/engine/salutejazz/datapacket_test.go b/internal/engine/salutejazz/datapacket_test.go deleted file mode 100644 index a0c1561..0000000 --- a/internal/engine/salutejazz/datapacket_test.go +++ /dev/null @@ -1,70 +0,0 @@ -package salutejazz - -import ( - "bytes" - "errors" - "io" - "testing" -) - -func TestDataPacketRoundTrip(t *testing.T) { - payload := []byte("hello jazz") - raw := EncodeDataPacket(payload) - - got, ok := DecodeDataPacket(raw) - if !ok { - t.Fatal("DecodeDataPacket() ok = false") - } - if !bytes.Equal(got, payload) { - t.Fatalf("DecodeDataPacket() = %q, want %q", got, payload) - } -} - -func TestDecodeDataPacketRejectsMalformedPackets(t *testing.T) { - tests := [][]byte{ - nil, - {0xff}, - encodeField(1, 0, encodeVarint(0)), - {byte(2<<3 | 2), 10, 1}, - {byte(3<<3 | 7), 0}, - } - - for _, raw := range tests { - if payload, ok := DecodeDataPacket(raw); ok { - t.Fatalf("DecodeDataPacket(%v) = (%q, true), want false", raw, payload) - } - } -} - -func TestParseFieldsSkipsSupportedNonTargetWireTypes(t *testing.T) { - data := encodeField(1, 0, encodeVarint(150)) - data = append(data, encodeField(3, 1, []byte("12345678"))...) - data = append(data, encodeField(4, 5, []byte("1234"))...) - data = append(data, encodeField(2, 2, []byte("target"))...) - - got, ok := parseFields(data, 2) - if !ok || string(got) != "target" { - t.Fatalf("parseFields() = (%q, %v), want target", got, ok) - } -} - -func TestByteReader(t *testing.T) { - r := &byteReader{data: []byte{1, 2, 3}} - b, err := r.ReadByte() - if err != nil || b != 1 { - t.Fatalf("ReadByte() = (%d, %v), want (1, nil)", b, err) - } - - buf := make([]byte, 4) - n, err := r.Read(buf) - if err != nil || n != 2 || !bytes.Equal(buf[:n], []byte{2, 3}) { - t.Fatalf("Read() = (%d, %v, %v), want two bytes", n, err, buf[:n]) - } - - if _, err := r.ReadByte(); !errors.Is(err, io.EOF) { - t.Fatalf("ReadByte() error = %v, want EOF", err) - } - if n, err := r.Read(buf); !errors.Is(err, io.EOF) || n != 0 { - t.Fatalf("Read() = (%d, %v), want (0, EOF)", n, err) - } -} diff --git a/internal/engine/salutejazz/salutejazz.go b/internal/engine/salutejazz/salutejazz.go deleted file mode 100644 index 4831d41..0000000 --- a/internal/engine/salutejazz/salutejazz.go +++ /dev/null @@ -1,1205 +0,0 @@ -// Package salutejazz implements an engine.Session backed by the SaluteJazz -// signaling protocol (WS + SDP with publisher/subscriber peer connection -// split). The on-wire protocol is Sber-specific; the media plane is -// straightforward WebRTC. Token acquisition lives in the auth package. -package salutejazz - -import ( - "context" - "errors" - "fmt" - "sync" - "sync/atomic" - "time" - - "github.com/google/uuid" - "github.com/gorilla/websocket" - "github.com/openlibrecommunity/olcrtc/internal/engine" - "github.com/openlibrecommunity/olcrtc/internal/logger" - "github.com/openlibrecommunity/olcrtc/internal/protect" - "github.com/pion/webrtc/v4" - "github.com/pion/webrtc/v4/pkg/media" -) - -const ( - maxDataChannelMessageSize = 12288 - sendDelay = 2 * time.Millisecond - - keyRoomID = "roomId" - keyEvent = "event" - keyRequestID = "requestId" - keyPayload = "payload" - keyGroupID = "groupId" - - eventMediaIn = "media-in" - - payloadMethod = "method" - payloadTrack = "track" - payloadType = "type" - payloadDesc = "description" - payloadSDP = "sdp" - payloadAnswer = "answer" - payloadOffer = "offer" - payloadMuted = "muted" - methodOffer = "rtc:offer" - - trackTypeAudio = "AUDIO" - trackTypeVideo = "VIDEO" - trackSourceMic = "MICROPHONE" - trackSourceCam = "CAMERA" - - credentialKeyPassword = "password" - - defaultSendQueueSize = 5000 - mediaReadyTimeout = 30 * time.Second - dataChannelTimeout = 30 * time.Second - wsReadTimeout = 60 * time.Second - wsHandshakeTimeout = 15 * time.Second - sendQueueTimeout = 50 * time.Millisecond - closeWaitTimeout = 2 * time.Second - pcCloseTimeout = 3 * time.Second - subscriberOfferGap = 300 * time.Millisecond - audioFrameDuration = 20 * time.Millisecond -) - -var opusSilenceFrame = []byte{0xf8, 0xff, 0xfe} //nolint:gochecknoglobals // static Opus silence frame - -var ( - // ErrPublisherNotInitialized is returned when the publisher peer connection is not set up. - ErrPublisherNotInitialized = errors.New("publisher peer connection not initialized") - // ErrSubscriberMediaTimeout is returned when the subscriber media is not ready in time. - ErrSubscriberMediaTimeout = errors.New("subscriber media timeout") - // ErrPublisherMediaTimeout is returned when the publisher media is not ready in time. - ErrPublisherMediaTimeout = errors.New("publisher media timeout") - // ErrDataChannelTimeout is returned when the data channel fails to open in time. - ErrDataChannelTimeout = errors.New("datachannel timeout") - // ErrDataChannelNotReady is returned when send is called before the data channel is open. - ErrDataChannelNotReady = errors.New("datachannel not ready") - // ErrSendQueueClosed is returned when send is called after Close. - ErrSendQueueClosed = errors.New("send queue closed") - // ErrSendQueueTimeout is returned when the send queue cannot accept new data in time. - ErrSendQueueTimeout = errors.New("send queue timeout") - // ErrURLRequired is returned when no connector URL was supplied. - ErrURLRequired = errors.New("salutejazz connector URL required") - // ErrRoomIDRequired is returned when no room ID was supplied. - ErrRoomIDRequired = errors.New("salutejazz room ID required") -) - -// Session is the SaluteJazz engine handle. -type Session struct { - name string - connectorURL string - roomID string - password string - ws *websocket.Conn - wsMu sync.Mutex - pcSub *webrtc.PeerConnection - pcPub *webrtc.PeerConnection - dc *webrtc.DataChannel - onData func([]byte) - onReconnect func(*webrtc.DataChannel) - shouldReconnect func() bool - reconnectCh chan struct{} - closeCh chan struct{} - closed atomic.Bool - reconnecting atomic.Bool - sendQueue chan []byte - sendQueueClosed atomic.Bool - onEnded func(string) - sessionCloseCh chan struct{} - videoTrackMu sync.RWMutex - videoTracks []webrtc.TrackLocal - audioTrack *webrtc.TrackLocalStaticSample - onVideoTrack func(*webrtc.TrackRemote, *webrtc.RTPReceiver) - subscriberReady atomic.Bool - publisherReady atomic.Bool - publisherStarted atomic.Bool - cameraUnmuted atomic.Bool - videoOffered atomic.Bool - subscriberConn chan struct{} - publisherConn chan struct{} - videoNegotiated chan struct{} - wg sync.WaitGroup - groupIDMu sync.RWMutex - groupID string -} - -// New creates a new SaluteJazz engine session. -// -// cfg.URL is the SaluteJazz connector WebSocket URL. cfg.Token carries the -// room ID; cfg.Extra["password"] carries the room password. These are -// produced by the salutejazz auth provider. -func New(_ context.Context, cfg engine.Config) (engine.Session, error) { - if cfg.URL == "" { - return nil, ErrURLRequired - } - // Token field encodes the room ID for this engine. - roomID := cfg.Token - if roomID == "" { - return nil, ErrRoomIDRequired - } - password := "" - if cfg.Extra != nil { - password = cfg.Extra[credentialKeyPassword] - } - - return &Session{ - name: cfg.Name, - connectorURL: cfg.URL, - roomID: roomID, - password: password, - onData: cfg.OnData, - reconnectCh: make(chan struct{}, 1), - closeCh: make(chan struct{}), - sessionCloseCh: make(chan struct{}), - sendQueue: make(chan []byte, defaultSendQueueSize), - subscriberConn: make(chan struct{}), - publisherConn: make(chan struct{}), - videoNegotiated: make(chan struct{}), - }, nil -} - -// Capabilities reports what this engine can do. -func (s *Session) Capabilities() engine.Capabilities { - return engine.Capabilities{ByteStream: true, VideoTrack: true} -} - -func (s *Session) resetMediaState() { - s.subscriberReady.Store(false) - s.publisherReady.Store(false) - s.publisherStarted.Store(false) - s.cameraUnmuted.Store(false) - s.videoOffered.Store(false) - s.subscriberConn = make(chan struct{}) - s.publisherConn = make(chan struct{}) - s.videoNegotiated = make(chan struct{}) - s.audioTrack = nil -} - -func closeSignal(ch chan struct{}) { - select { - case <-ch: - default: - close(ch) - } -} - -func (s *Session) hasLocalVideoTracks() bool { - s.videoTrackMu.RLock() - defer s.videoTrackMu.RUnlock() - return len(s.videoTracks) > 0 -} - -func (s *Session) videoTrackHandler() func(*webrtc.TrackRemote, *webrtc.RTPReceiver) { - s.videoTrackMu.RLock() - defer s.videoTrackMu.RUnlock() - return s.onVideoTrack -} - -func (s *Session) attachPendingVideoTracks() error { - s.videoTrackMu.Lock() - defer s.videoTrackMu.Unlock() - - if len(s.videoTracks) > 0 { - if err := s.ensurePublisherAudioTrackLocked(); err != nil { - return err - } - for _, track := range s.videoTracks { - if track == nil || track.Kind() != webrtc.RTPCodecTypeVideo { - continue - } - if _, err := s.pcPub.AddTrack(track); err != nil { - return fmt.Errorf("add video track: %w", err) - } - s.videoOffered.Store(true) - } - } - return nil -} - -func (s *Session) ensurePublisherAudioTrackLocked() error { - if s.audioTrack != nil { - return nil - } - - track, err := webrtc.NewTrackLocalStaticSample( - webrtc.RTPCodecCapability{ - MimeType: webrtc.MimeTypeOpus, - ClockRate: 48000, - Channels: 2, - }, - "microphone", - "olcrtc", - ) - if err != nil { - return fmt.Errorf("create audio track: %w", err) - } - if _, err := s.pcPub.AddTrack(track); err != nil { - return fmt.Errorf("add audio track: %w", err) - } - s.audioTrack = track - - s.wg.Add(1) - go s.writeAudioSilence(track) - return nil -} - -func (s *Session) writeAudioSilence(track *webrtc.TrackLocalStaticSample) { - defer s.wg.Done() - - ticker := time.NewTicker(audioFrameDuration) - defer ticker.Stop() - - for { - select { - case <-s.closeCh: - return - case <-ticker.C: - _ = track.WriteSample(media.Sample{ - Data: opusSilenceFrame, - Duration: audioFrameDuration, - }) - } - } -} - -func defaultWebRTCConfig() webrtc.Configuration { - return webrtc.Configuration{ - ICEServers: []webrtc.ICEServer{}, - SDPSemantics: webrtc.SDPSemanticsUnifiedPlan, - BundlePolicy: webrtc.BundlePolicyMaxBundle, - } -} - -func (s *Session) buildAPI() *webrtc.API { - se := webrtc.SettingEngine{} - if protect.Protector != nil { - se.SetICEProxyDialer(protect.NewProxyDialer()) - } - se.LoggerFactory = logger.NewPionLoggerFactory() - return webrtc.NewAPI(webrtc.WithSettingEngine(se)) -} - -func (s *Session) createPeerConnections(api *webrtc.API, config webrtc.Configuration) error { - var err error - s.pcSub, err = api.NewPeerConnection(config) - if err != nil { - return fmt.Errorf("create subscriber pc: %w", err) - } - s.pcSub.OnConnectionStateChange(s.onSubscriberConnectionStateChange) - s.pcSub.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { - if track.Kind() != webrtc.RTPCodecTypeVideo { - return - } - logger.Infof("[salutejazz] remote video track: codec=%s stream=%s track=%s", - track.Codec().MimeType, track.StreamID(), track.ID()) - if cb := s.videoTrackHandler(); cb != nil { - cb(track, receiver) - } - }) - s.pcSub.OnICECandidate(func(candidate *webrtc.ICECandidate) { - s.sendICECandidate(candidate, "SUBSCRIBER") - }) - - s.pcPub, err = api.NewPeerConnection(config) - if err != nil { - return fmt.Errorf("create publisher pc: %w", err) - } - s.pcPub.OnConnectionStateChange(s.onPublisherConnectionStateChange) - s.pcPub.OnICECandidate(func(candidate *webrtc.ICECandidate) { - s.sendICECandidate(candidate, "PUBLISHER") - }) - return nil -} - -func (s *Session) setGroupID(groupID string) { - s.groupIDMu.Lock() - s.groupID = groupID - s.groupIDMu.Unlock() -} - -func (s *Session) getGroupID() string { - s.groupIDMu.RLock() - defer s.groupIDMu.RUnlock() - return s.groupID -} - -func (s *Session) createDataChannel() (chan struct{}, error) { - var err error - s.dc, err = s.pcPub.CreateDataChannel("_reliable", &webrtc.DataChannelInit{ - Ordered: func() *bool { v := true; return &v }(), - }) - if err != nil { - return nil, fmt.Errorf("create datachannel: %w", err) - } - dcReady := make(chan struct{}) - s.setupDataChannelHandlers(dcReady) - return dcReady, nil -} - -func (s *Session) waitForReady(ctx context.Context, dcReady chan struct{}) error { - if dcReady != nil { - select { - case <-dcReady: - return nil - case <-time.After(dataChannelTimeout): - return ErrDataChannelTimeout - case <-ctx.Done(): - return fmt.Errorf("connect canceled: %w", ctx.Err()) - } - } - return s.waitForMediaReady(ctx, mediaReadyTimeout) -} - -// Connect starts the WebRTC connection process. -func (s *Session) Connect(ctx context.Context) error { - s.closed.Store(false) - s.resetMediaState() - - api := s.buildAPI() - config := defaultWebRTCConfig() - - if err := s.createPeerConnections(api, config); err != nil { - return err - } - if err := s.attachPendingVideoTracks(); err != nil { - return err - } - - var dcReady chan struct{} - if s.onData != nil { - var err error - dcReady, err = s.createDataChannel() - if err != nil { - return err - } - } - - if err := s.dialWebSocket(); err != nil { - return err - } - if err := s.sendJoin(); err != nil { - return err - } - - s.wg.Add(1) - go func() { - defer s.wg.Done() - s.handleSignaling(ctx) - }() - - return s.waitForReady(ctx, dcReady) -} - -func (s *Session) waitForMediaReady(ctx context.Context, timeout time.Duration) error { - timer := time.NewTimer(timeout) - defer timer.Stop() - - select { - case <-s.subscriberConn: - case <-timer.C: - return ErrSubscriberMediaTimeout - case <-ctx.Done(): - return fmt.Errorf("connect cancelled: %w", ctx.Err()) - } - - if !s.hasLocalVideoTracks() { - return nil - } - - select { - case <-s.videoNegotiated: - case <-timer.C: - return ErrPublisherMediaTimeout - case <-ctx.Done(): - return fmt.Errorf("connect cancelled: %w", ctx.Err()) - } - return nil -} - -func (s *Session) dialWebSocket() error { - wsDialer := protect.NewWebSocketDialer(wsHandshakeTimeout) - - ws, resp, err := wsDialer.Dial(s.connectorURL, nil) - if err != nil { - return fmt.Errorf("dial websocket: %w", err) - } - if resp != nil && resp.Body != nil { - _ = resp.Body.Close() - } - - s.ws = ws - ws.SetPongHandler(func(string) error { - _ = ws.SetReadDeadline(time.Now().Add(wsReadTimeout)) - return nil - }) - _ = ws.SetReadDeadline(time.Now().Add(wsReadTimeout)) - return nil -} - -func (s *Session) sendJoin() error { - joinMsg := map[string]any{ - keyRoomID: s.roomID, - keyEvent: "join", - keyRequestID: uuid.New().String(), - keyPayload: map[string]any{ - "password": s.password, - "participantName": s.name, - "supportedFeatures": map[string]any{ - "attachedRooms": true, - "sessionGroups": true, - "transcription": true, - }, - "isSilent": false, - }, - } - - s.wsMu.Lock() - defer s.wsMu.Unlock() - if err := s.ws.WriteJSON(joinMsg); err != nil { - return fmt.Errorf("write join json: %w", err) - } - return nil -} - -func (s *Session) setupDataChannelHandlers(dcReady chan struct{}) { - s.dc.OnOpen(func() { - logger.Verbosef("[salutejazz] Publisher DC opened: %s", s.dc.Label()) - s.wg.Add(1) - go func() { - defer s.wg.Done() - s.processSendQueue() - }() - close(dcReady) - }) - - s.dc.OnClose(func() { - logger.Verbosef("[salutejazz] Publisher DC closed") - if !s.closed.Load() { - s.queueReconnect() - } - }) - - s.dc.OnMessage(func(msg webrtc.DataChannelMessage) { - s.handleIncomingMessage(msg.Data, "publisher") - }) - - s.pcSub.OnDataChannel(func(dc *webrtc.DataChannel) { - logger.Verbosef("[salutejazz] Received subscriber DataChannel: %s", dc.Label()) - if dc.Label() != "_reliable" { - return - } - if s.onData != nil { - dc.OnMessage(func(msg webrtc.DataChannelMessage) { - s.handleIncomingMessage(msg.Data, "subscriber") - }) - } - }) -} - -func (s *Session) onSubscriberConnectionStateChange(state webrtc.PeerConnectionState) { - switch state { - case webrtc.PeerConnectionStateConnected: - s.subscriberReady.Store(true) - closeSignal(s.subscriberConn) - case webrtc.PeerConnectionStateDisconnected, webrtc.PeerConnectionStateFailed: - s.subscriberReady.Store(false) - if !s.closed.Load() { - s.queueReconnect() - } - case webrtc.PeerConnectionStateClosed: - s.subscriberReady.Store(false) - case webrtc.PeerConnectionStateUnknown, - webrtc.PeerConnectionStateNew, - webrtc.PeerConnectionStateConnecting: - } -} - -func (s *Session) onPublisherConnectionStateChange(state webrtc.PeerConnectionState) { - switch state { - case webrtc.PeerConnectionStateConnected: - s.publisherReady.Store(true) - closeSignal(s.publisherConn) - case webrtc.PeerConnectionStateDisconnected, webrtc.PeerConnectionStateFailed: - s.publisherReady.Store(false) - if !s.closed.Load() { - s.queueReconnect() - } - case webrtc.PeerConnectionStateClosed: - s.publisherReady.Store(false) - case webrtc.PeerConnectionStateUnknown, - webrtc.PeerConnectionStateNew, - webrtc.PeerConnectionStateConnecting: - } -} - -func (s *Session) handleIncomingMessage(data []byte, source string) { - logger.Verbosef("[salutejazz] Received %d bytes on %s DC (raw)", len(data), source) - - payload, ok := DecodeDataPacket(data) - if !ok { - logger.Debugf("[salutejazz] Failed to decode DataPacket, trying raw") - if s.onData != nil && len(data) > 0 { - s.onData(data) - } - return - } - - logger.Verbosef("[salutejazz] Decoded DataPacket: %d bytes payload", len(payload)) - if s.onData != nil && len(payload) > 0 { - s.onData(payload) - } -} - -func (s *Session) handleSignaling(_ context.Context) { - for { - var msg map[string]any - if err := s.ws.ReadJSON(&msg); err != nil { - if !s.closed.Load() { - logger.Debugf("ws read error: %v", err) - s.queueReconnect() - } - return - } - - s.updateWSDeadline() - - event, _ := msg[keyEvent].(string) - payload, _ := msg[keyPayload].(map[string]any) - - switch event { - case "join-response": - s.handleJoinResponse(payload) - case "media-out": - s.handleMediaOut(payload) - } - } -} - -func (s *Session) handleJoinResponse(payload map[string]any) { - group, _ := payload["participantGroup"].(map[string]any) - groupID, _ := group["groupId"].(string) - s.setGroupID(groupID) - logger.Verbosef("[salutejazz] peer joined: groupId=%s", groupID) -} - -func (s *Session) handleMediaOut(payload map[string]any) { - method, _ := payload["method"].(string) - - switch method { - case "rtc:config": - s.handleRTCConfig(payload) - case "rtc:join": - logger.Verbosef("[salutejazz] rtc:join received") - case "rtc:offer": - s.handleSubscriberOffer(payload) - case "rtc:answer": - s.handlePublisherAnswer(payload) - case "rtc:ice": - s.handleICE(payload) - case "rtc:participants:update": - s.handleParticipantsUpdate(payload) - } -} - -func (s *Session) handleRTCConfig(payload map[string]any) { - config, _ := payload["configuration"].(map[string]any) - servers, _ := config["iceServers"].([]any) - - var iceServers []webrtc.ICEServer - for _, srv := range servers { - server, _ := srv.(map[string]any) - urls, _ := server["urls"].([]any) - username, _ := server["username"].(string) - credential, _ := server["credential"].(string) - - var urlStrs []string - for _, u := range urls { - if urlStr, ok := u.(string); ok && urlStr != "" { - urlStrs = append(urlStrs, urlStr) - } - } - - if len(urlStrs) > 0 { - iceServers = append(iceServers, webrtc.ICEServer{ - URLs: urlStrs, - Username: username, - Credential: credential, - }) - } - } - - if len(iceServers) > 0 { - newConfig := webrtc.Configuration{ - ICEServers: iceServers, - SDPSemantics: webrtc.SDPSemanticsUnifiedPlan, - BundlePolicy: webrtc.BundlePolicyMaxBundle, - } - _ = s.pcSub.SetConfiguration(newConfig) - _ = s.pcPub.SetConfiguration(newConfig) - } -} - -func (s *Session) handleSubscriberOffer(payload map[string]any) { - desc, _ := payload[payloadDesc].(map[string]any) - sdp, _ := desc[payloadSDP].(string) - - if err := s.pcSub.SetRemoteDescription(webrtc.SessionDescription{ - Type: webrtc.SDPTypeOffer, - SDP: sdp, - }); err != nil { - logger.Debugf("set remote desc error: %v", err) - return - } - - answer, err := s.pcSub.CreateAnswer(nil) - if err != nil { - logger.Debugf("create answer error: %v", err) - return - } - - if err := s.pcSub.SetLocalDescription(answer); err != nil { - logger.Debugf("set local desc error: %v", err) - return - } - - s.wsMu.Lock() - _ = s.ws.WriteJSON(map[string]any{ - keyRoomID: s.roomID, - keyEvent: eventMediaIn, - keyGroupID: s.getGroupID(), - keyRequestID: uuid.New().String(), - keyPayload: map[string]any{ - payloadMethod: "rtc:answer", - payloadDesc: map[string]any{ - payloadType: payloadAnswer, - payloadSDP: answer.SDP, - }, - }, - }) - s.wsMu.Unlock() - - time.Sleep(subscriberOfferGap) - if s.publisherStarted.CompareAndSwap(false, true) { - s.sendPublisherOffer() - } -} - -func (s *Session) sendPublisherOffer() { - if err := s.sendPublisherAudioTrackAdd(); err != nil { - logger.Debugf("send publisher track add error: %v", err) - return - } - if err := s.sendPublisherVideoTrackAdds(); err != nil { - logger.Debugf("send publisher video track add error: %v", err) - return - } - - offer, err := s.pcPub.CreateOffer(nil) - if err != nil { - logger.Debugf("create pub offer error: %v", err) - return - } - - if err := s.pcPub.SetLocalDescription(offer); err != nil { - logger.Debugf("set local pub desc error: %v", err) - return - } - - logger.Infof("[salutejazz] send publisher offer audio=%t video=%t", s.publisherHasAudioTrack(), s.videoOffered.Load()) - s.wsMu.Lock() - _ = s.ws.WriteJSON(map[string]any{ - keyRoomID: s.roomID, - keyEvent: "media-in", - "groupId": s.getGroupID(), - keyRequestID: uuid.New().String(), - keyPayload: map[string]any{ - payloadMethod: methodOffer, - payloadDesc: map[string]any{ - payloadType: payloadOffer, - payloadSDP: offer.SDP, - }, - }, - }) - s.wsMu.Unlock() -} - -func (s *Session) sendPublisherAudioTrackAdd() error { - s.videoTrackMu.RLock() - hasAudioTrack := s.audioTrack != nil - s.videoTrackMu.RUnlock() - - if hasAudioTrack { - return s.sendPublisherTrackAdd(trackTypeAudio, trackSourceMic, true) - } - return nil -} - -func (s *Session) sendPublisherTrackAdd(trackType, source string, muted bool) error { - logger.Infof("[salutejazz] send track add type=%s source=%s muted=%t", trackType, source, muted) - - s.wsMu.Lock() - defer s.wsMu.Unlock() - - if err := s.ws.WriteJSON(map[string]any{ - keyRoomID: s.roomID, - keyEvent: eventMediaIn, - keyGroupID: s.getGroupID(), - keyRequestID: uuid.New().String(), - keyPayload: map[string]any{ - payloadMethod: "rtc:track:add", - "cid": uuid.New().String(), - payloadTrack: map[string]any{ - payloadType: trackType, - "source": source, - "muted": muted, - }, - }, - }); err != nil { - return fmt.Errorf("write track add json: %w", err) - } - return nil -} - -func (s *Session) sendPublisherVideoTrackAdds() error { - s.videoTrackMu.RLock() - tracks := append([]webrtc.TrackLocal(nil), s.videoTracks...) - s.videoTrackMu.RUnlock() - - for _, track := range tracks { - if track == nil || track.Kind() != webrtc.RTPCodecTypeVideo { - continue - } - if err := s.sendPublisherTrackAdd(trackTypeVideo, trackSourceCam, true); err != nil { - return err - } - } - return nil -} - -func (s *Session) publisherHasAudioTrack() bool { - s.videoTrackMu.RLock() - defer s.videoTrackMu.RUnlock() - return s.audioTrack != nil -} - -func (s *Session) handleParticipantsUpdate(payload map[string]any) { - if !s.hasLocalVideoTracks() || !s.videoOffered.Load() { - return - } - - track, ok := publisherCameraTrack(payload) - if !ok { - logger.Infof("[salutejazz] participants update without local publisher camera track") - return - } - - sid, _ := track["sid"].(string) - if muted, _ := track[payloadMuted].(bool); !muted { - logger.Infof("[salutejazz] publisher camera already unmuted sid=%s", sid) - s.cameraUnmuted.Store(true) - return - } - - logger.Infof("[salutejazz] publisher camera track sid=%s muted=true, sending unmute", sid) - if sid == "" || !s.cameraUnmuted.CompareAndSwap(false, true) { - return - } - if err := s.sendTrackMuted(sid, false); err != nil { - logger.Debugf("[salutejazz] send camera unmute error: %v", err) - } -} - -func publisherCameraTrack(payload map[string]any) (map[string]any, bool) { - update, _ := payload["update"].(map[string]any) - participants, _ := update["participants"].([]any) - for _, rawParticipant := range participants { - participant, _ := rawParticipant.(map[string]any) - if isPublisher, ok := participant["isPublisher"].(bool); ok && !isPublisher { - continue - } - - tracks, _ := participant["tracks"].([]any) - for _, rawTrack := range tracks { - track, _ := rawTrack.(map[string]any) - trackType, _ := track[payloadType].(string) - source, _ := track["source"].(string) - if trackType != trackTypeVideo || source != trackSourceCam { - continue - } - - return track, true - } - } - - return nil, false -} - -func (s *Session) sendTrackMuted(sid string, muted bool) error { - logger.Infof("[salutejazz] send track muted sid=%s muted=%t", sid, muted) - - s.wsMu.Lock() - defer s.wsMu.Unlock() - - if err := s.ws.WriteJSON(map[string]any{ - keyRoomID: s.roomID, - keyEvent: eventMediaIn, - keyGroupID: s.getGroupID(), - keyRequestID: uuid.New().String(), - keyPayload: map[string]any{ - payloadMethod: "rtc:track:muted", - "mute": map[string]any{ - "sid": sid, - payloadMuted: muted, - }, - }, - }); err != nil { - return fmt.Errorf("write track muted json: %w", err) - } - return nil -} - -func (s *Session) sendICECandidate(candidate *webrtc.ICECandidate, target string) { - if candidate == nil { - return - } - - groupID := s.getGroupID() - if groupID == "" { - logger.Debugf("[salutejazz] drop local ICE candidate before group id target=%s", target) - return - } - - s.wsMu.Lock() - defer s.wsMu.Unlock() - if s.ws == nil || s.closed.Load() { - return - } - - if err := s.ws.WriteJSON(map[string]any{ - keyRoomID: s.roomID, - keyEvent: eventMediaIn, - keyGroupID: groupID, - keyRequestID: uuid.New().String(), - keyPayload: map[string]any{ - payloadMethod: "rtc:ice", - "rtcIceCandidates": []any{jazzICECandidatePayload(candidate.ToJSON(), target)}, - }, - }); err != nil { - logger.Debugf("[salutejazz] send local ICE candidate error: %v", err) - } -} - -func jazzICECandidatePayload(candidate webrtc.ICECandidateInit, target string) map[string]any { - sdpMid := "" - if candidate.SDPMid != nil { - sdpMid = *candidate.SDPMid - } - sdpMLineIndex := uint16(0) - if candidate.SDPMLineIndex != nil { - sdpMLineIndex = *candidate.SDPMLineIndex - } - usernameFragment := "" - if candidate.UsernameFragment != nil { - usernameFragment = *candidate.UsernameFragment - } - - return map[string]any{ - "candidate": candidate.Candidate, - "sdpMid": sdpMid, - "sdpMLineIndex": sdpMLineIndex, - "usernameFragment": usernameFragment, - "target": target, - } -} - -func (s *Session) handlePublisherAnswer(payload map[string]any) { - desc, _ := payload[payloadDesc].(map[string]any) - sdp, _ := desc[payloadSDP].(string) - - if err := s.pcPub.SetRemoteDescription(webrtc.SessionDescription{ - Type: webrtc.SDPTypeAnswer, - SDP: sdp, - }); err != nil { - logger.Debugf("set remote pub desc error: %v", err) - return - } - - logger.Infof("[salutejazz] publisher answer received video=%t", s.videoOffered.Load()) - if s.videoOffered.Load() { - closeSignal(s.videoNegotiated) - } -} - -func (s *Session) handleICE(payload map[string]any) { - candidates, _ := payload["rtcIceCandidates"].([]any) - - for _, c := range candidates { - cand, _ := c.(map[string]any) - candStr, _ := cand["candidate"].(string) - target, _ := cand["target"].(string) - sdpMid, _ := cand["sdpMid"].(string) - sdpMLineIndex, _ := cand["sdpMLineIndex"].(float64) - - init := webrtc.ICECandidateInit{ - Candidate: candStr, - SDPMid: &sdpMid, - SDPMLineIndex: func() *uint16 { v := uint16(sdpMLineIndex); return &v }(), - } - - switch target { - case "SUBSCRIBER": - _ = s.pcSub.AddICECandidate(init) - case "PUBLISHER": - _ = s.pcPub.AddICECandidate(init) - } - } -} - -func (s *Session) updateWSDeadline() { - s.wsMu.Lock() - if s.ws != nil { - _ = s.ws.SetReadDeadline(time.Now().Add(wsReadTimeout)) - } - s.wsMu.Unlock() -} - -// Send queues data for transmission. -func (s *Session) Send(data []byte) error { - if s.dc == nil || s.dc.ReadyState() != webrtc.DataChannelStateOpen { - return ErrDataChannelNotReady - } - if s.sendQueueClosed.Load() { - return ErrSendQueueClosed - } - - select { - case s.sendQueue <- data: - return nil - case <-time.After(sendQueueTimeout): - return ErrSendQueueTimeout - } -} - -func (s *Session) processSendQueue() { - for { - select { - case <-s.sessionCloseCh: - return - case <-s.closeCh: - return - case data := <-s.sendQueue: - if len(data) > maxDataChannelMessageSize { - logger.Debugf("[salutejazz] Message too large: %d bytes (max %d)", len(data), maxDataChannelMessageSize) - continue - } - - encoded := EncodeDataPacket(data) - logger.Verbosef("[salutejazz] Sending %d bytes (encoded to %d bytes)", len(data), len(encoded)) - - if err := s.dc.Send(encoded); err != nil { - logger.Debugf("send error: %v", err) - s.queueReconnect() - return - } - time.Sleep(sendDelay) - } - } -} - -// Close terminates the connection. -// -// Close ordering matters: the WebSocket is shut down BEFORE wg.Wait so that -// handleSignaling, which is parked in ws.ReadJSON, unblocks immediately. If -// we waited on wg first the ReadJSON would only return once the deferred -// ws.Close further down ran, eating the full closeWaitTimeout (and on top -// of that the e2e harness only allows ~20s for goroutines to drain after -// cancel — long enough for pion's TURN refresh storm to push the client -// past the deadline). The data channel and peer connections are torn down -// after the WS so that any final ICE / signaling cleanup the goroutines do -// on their way out still has somewhere to write. -// -// pion's PeerConnection.Close blocks until the ICE agent and its TURN -// allocations drain; on the jazz e2e runner most relays get rejected with -// "403: Forbidden IP" and the agent keeps logging "Failed to handle -// message: the agent is closed" every 2s while it churns through them. We -// fire dc/pc closes in parallel and cap them with pcCloseTimeout so a -// stuck pion goroutine never holds up the carrier link teardown past the -// e2e harness's 20s budget. -func (s *Session) Close() error { - s.closed.Store(true) - s.sendQueueClosed.Store(true) - - close(s.closeCh) - s.shutdownWebSocket() - - done := make(chan struct{}) - go func() { - s.wg.Wait() - close(done) - }() - - select { - case <-done: - case <-time.After(closeWaitTimeout): - } - - closers := make([]func() error, 0, 3) - if s.dc != nil { - closers = append(closers, s.dc.Close) - } - if s.pcPub != nil { - closers = append(closers, s.pcPub.Close) - } - if s.pcSub != nil { - closers = append(closers, s.pcSub.Close) - } - closeWithDeadline(closers, pcCloseTimeout) - return nil -} - -// closeWithDeadline runs the supplied Close funcs concurrently and returns -// once all of them have returned OR the deadline elapses, whichever comes -// first. Stragglers (typically a pion PeerConnection.Close waiting on a -// wedged TURN allocation) are left to finish in the background so they -// don't block carrier-link teardown. -func closeWithDeadline(closers []func() error, timeout time.Duration) { - if len(closers) == 0 { - return - } - var wg sync.WaitGroup - wg.Add(len(closers)) - for _, fn := range closers { - go func(fn func() error) { - defer wg.Done() - _ = fn() - }(fn) - } - done := make(chan struct{}) - go func() { - wg.Wait() - close(done) - }() - select { - case <-done: - case <-time.After(timeout): - } -} - -// shutdownWebSocket politely closes the connector WebSocket and trips its -// read deadline to the past so any blocked ReadJSON in handleSignaling -// returns immediately. The conn pointer is left intact on purpose: writers -// elsewhere (sendICECandidate, etc.) gate on s.closed.Load() rather than a -// nil check, and zeroing it here would race with handleSignaling reading -// s.ws unlocked. Safe to call multiple times — gorilla/websocket Close is -// idempotent. -func (s *Session) shutdownWebSocket() { - s.wsMu.Lock() - defer s.wsMu.Unlock() - if s.ws == nil { - return - } - _ = s.ws.WriteControl(websocket.CloseMessage, - websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), - time.Now().Add(time.Second)) - _ = s.ws.SetReadDeadline(time.Now()) - _ = s.ws.Close() -} - -// AddVideoTrack adds a video track to the publisher peer connection. -func (s *Session) AddVideoTrack(track webrtc.TrackLocal) error { - s.videoTrackMu.Lock() - s.videoTracks = append(s.videoTracks, track) - if s.pcPub != nil && s.audioTrack == nil { - if err := s.ensurePublisherAudioTrackLocked(); err != nil { - s.videoTrackMu.Unlock() - return err - } - } - s.videoTrackMu.Unlock() - - if s.pcPub == nil { - return nil - } - if !s.publisherStarted.Load() { - if track != nil && track.Kind() == webrtc.RTPCodecTypeVideo { - if _, err := s.pcPub.AddTrack(track); err != nil { - return fmt.Errorf("failed to add track: %w", err) - } - s.videoOffered.Store(true) - } - return nil - } - return nil -} - -// SetVideoTrackHandler registers a callback for remote video tracks. -func (s *Session) SetVideoTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) { - s.videoTrackMu.Lock() - defer s.videoTrackMu.Unlock() - s.onVideoTrack = cb -} - -// SetReconnectCallback sets the callback for reconnection events. -func (s *Session) SetReconnectCallback(cb func(*webrtc.DataChannel)) { s.onReconnect = cb } - -// SetShouldReconnect sets the policy for reconnection. -func (s *Session) SetShouldReconnect(fn func() bool) { s.shouldReconnect = fn } - -// SetEndedCallback sets the callback for connection termination. -func (s *Session) SetEndedCallback(cb func(string)) { s.onEnded = cb } - -// WatchConnection monitors the connection lifecycle. -func (s *Session) WatchConnection(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case <-s.closeCh: - return - case <-s.reconnectCh: - } - } -} - -// CanSend checks if data can be sent. -func (s *Session) CanSend() bool { - if s.onData == nil { - if s.hasLocalVideoTracks() { - return !s.closed.Load() && s.subscriberReady.Load() && s.publisherReady.Load() - } - return !s.closed.Load() && s.subscriberReady.Load() - } - if s.dc == nil || s.dc.ReadyState() != webrtc.DataChannelStateOpen { - return false - } - return len(s.sendQueue) < 4000 -} - -// GetSendQueue returns the transmission queue. -func (s *Session) GetSendQueue() chan []byte { return s.sendQueue } - -// GetBufferedAmount returns the WebRTC buffered amount. -func (s *Session) GetBufferedAmount() uint64 { - if s.dc != nil { - return s.dc.BufferedAmount() - } - return 0 -} - -func (s *Session) queueReconnect() { - if s.closed.Load() || s.reconnecting.Load() { - return - } - if s.shouldReconnect != nil && !s.shouldReconnect() { - return - } - select { - case s.reconnectCh <- struct{}{}: - default: - } -} - -func init() { //nolint:gochecknoinits // engine registration is the canonical Go pattern for plugins - engine.Register("salutejazz", New) -} diff --git a/internal/engine/salutejazz/session_helpers_test.go b/internal/engine/salutejazz/session_helpers_test.go deleted file mode 100644 index 6fea00e..0000000 --- a/internal/engine/salutejazz/session_helpers_test.go +++ /dev/null @@ -1,320 +0,0 @@ -package salutejazz - -import ( - "context" - "errors" - "net/http" - "net/http/httptest" - "testing" - - "github.com/gorilla/websocket" - "github.com/pion/webrtc/v4" -) - -const ( - testJazzGroupID = "group-1" - testJazzRoomID = "room-1" -) - -//nolint:cyclop // table-driven test naturally has many branches -func TestSessionStateHelpers(t *testing.T) { - s := &Session{ - reconnectCh: make(chan struct{}, 1), - closeCh: make(chan struct{}), - sessionCloseCh: make(chan struct{}), - sendQueue: make(chan []byte, 1), - subscriberConn: make(chan struct{}), - publisherConn: make(chan struct{}), - } - - s.resetMediaState() - if s.subscriberReady.Load() || s.publisherReady.Load() || s.subscriberConn == nil || s.publisherConn == nil { - t.Fatal("resetMediaState() did not reset readiness") - } - if s.hasLocalVideoTracks() { - t.Fatal("hasLocalVideoTracks() = true without tracks") - } - if err := s.AddVideoTrack(nil); err != nil { - t.Fatalf("AddVideoTrack(nil) error = %v", err) - } - if !s.hasLocalVideoTracks() { - t.Fatal("hasLocalVideoTracks() = false after AddVideoTrack") - } - - s.SetVideoTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver) {}) - if s.videoTrackHandler() == nil { - t.Fatal("videoTrackHandler() = nil") - } - - cfg := defaultWebRTCConfig() - if cfg.SDPSemantics != webrtc.SDPSemanticsUnifiedPlan || cfg.BundlePolicy != webrtc.BundlePolicyMaxBundle { - t.Fatalf("defaultWebRTCConfig() = %+v", cfg) - } - if s.buildAPI() == nil { - t.Fatal("buildAPI() returned nil") - } -} - -func TestSessionCallbacksQueueReconnectAndClose(t *testing.T) { - s := &Session{ - reconnectCh: make(chan struct{}, 1), - closeCh: make(chan struct{}), - sessionCloseCh: make(chan struct{}), - sendQueue: make(chan []byte, 1), - } - - s.SetReconnectCallback(func(*webrtc.DataChannel) {}) - s.SetShouldReconnect(func() bool { return true }) - s.SetEndedCallback(func(string) {}) - if s.onReconnect == nil || s.shouldReconnect == nil || s.onEnded == nil { - t.Fatal("callbacks were not stored") - } - - s.queueReconnect() - select { - case <-s.reconnectCh: - default: - t.Fatal("queueReconnect() did not enqueue") - } - - s.SetShouldReconnect(func() bool { return false }) - s.queueReconnect() - select { - case <-s.reconnectCh: - t.Fatal("queueReconnect() enqueued despite policy=false") - default: - } - - done := make(chan struct{}) - go func() { - s.WatchConnection(context.Background()) - close(done) - }() - if err := s.Close(); err != nil { - t.Fatalf("Close() error = %v", err) - } - <-done - if err := s.Send([]byte("closed")); !errors.Is(err, ErrDataChannelNotReady) { - t.Fatalf("Send() error = %v, want datachannel not ready", err) - } -} - -func TestSessionCanSendVideoOnlyModes(t *testing.T) { - s := &Session{sendQueue: make(chan []byte, 1)} - s.subscriberReady.Store(true) - if !s.CanSend() { - t.Fatal("CanSend() = false for subscriber-ready session without local video") - } - _ = s.AddVideoTrack(nil) - if s.CanSend() { - t.Fatal("CanSend() = true with local video but publisher not ready") - } - s.publisherReady.Store(true) - if !s.CanSend() { - t.Fatal("CanSend() = false with subscriber and publisher ready") - } - s.closed.Store(true) - if s.CanSend() { - t.Fatal("CanSend() = true for closed session") - } -} - -func TestSendPublisherTrackAddWritesJazzPayload(t *testing.T) { - msgCh := make(chan map[string]any, 1) - upgrader := websocket.Upgrader{ - CheckOrigin: func(*http.Request) bool { return true }, - } - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - t.Errorf("upgrade websocket: %v", err) - return - } - defer func() { _ = conn.Close() }() - - var msg map[string]any - if err := conn.ReadJSON(&msg); err != nil { - t.Errorf("read json: %v", err) - return - } - msgCh <- msg - })) - defer server.Close() - - wsURL := "ws" + server.URL[len("http"):] - conn, resp, err := websocket.DefaultDialer.Dial(wsURL, nil) - if resp != nil && resp.Body != nil { - _ = resp.Body.Close() - } - if err != nil { - t.Fatalf("dial websocket: %v", err) - } - defer func() { _ = conn.Close() }() - - s := &Session{ - roomID: testJazzRoomID, - groupID: testJazzGroupID, - ws: conn, - } - if err := s.sendPublisherTrackAdd("VIDEO", "CAMERA", false); err != nil { - t.Fatalf("sendPublisherTrackAdd() error = %v", err) - } - - msg := <-msgCh - assertJazzTrackAddEnvelope(t, msg) - assertJazzTrackAddPayload(t, msg[keyPayload]) -} - -func TestHandleParticipantsUpdateUnmutesCameraTrack(t *testing.T) { - msgCh := make(chan map[string]any, 1) - upgrader := websocket.Upgrader{ - CheckOrigin: func(*http.Request) bool { return true }, - } - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - conn, err := upgrader.Upgrade(w, r, nil) - if err != nil { - t.Errorf("upgrade websocket: %v", err) - return - } - defer func() { _ = conn.Close() }() - - var msg map[string]any - if err := conn.ReadJSON(&msg); err != nil { - t.Errorf("read json: %v", err) - return - } - msgCh <- msg - })) - defer server.Close() - - wsURL := "ws" + server.URL[len("http"):] - conn, resp, err := websocket.DefaultDialer.Dial(wsURL, nil) - if resp != nil && resp.Body != nil { - _ = resp.Body.Close() - } - if err != nil { - t.Fatalf("dial websocket: %v", err) - } - defer func() { _ = conn.Close() }() - - s := &Session{ - roomID: testJazzRoomID, - groupID: testJazzGroupID, - ws: conn, - videoTracks: []webrtc.TrackLocal{nil}, - } - s.videoOffered.Store(true) - s.handleParticipantsUpdate(map[string]any{ - "update": map[string]any{ - "participants": []any{ - map[string]any{ - "isPublisher": true, - "tracks": []any{ - map[string]any{ - "sid": "TR_CAMERA_1", - "type": "VIDEO", - "source": "CAMERA", - payloadMuted: true, - }, - }, - }, - }, - }, - }) - - msg := <-msgCh - assertJazzTrackAddEnvelope(t, msg) - assertJazzTrackMutedPayload(t, msg[keyPayload]) -} - -func TestJazzICECandidatePayload(t *testing.T) { - sdpMid := "0" - sdpMLineIndex := uint16(1) - usernameFragment := "ufrag-1" - - got := jazzICECandidatePayload(webrtc.ICECandidateInit{ - Candidate: "candidate:1 1 udp 1 127.0.0.1 12345 typ host", - SDPMid: &sdpMid, - SDPMLineIndex: &sdpMLineIndex, - UsernameFragment: &usernameFragment, - }, "PUBLISHER") - - if got["candidate"] != "candidate:1 1 udp 1 127.0.0.1 12345 typ host" { - t.Fatalf("candidate = %v", got["candidate"]) - } - if got["sdpMid"] != "0" { - t.Fatalf("sdpMid = %v, want 0", got["sdpMid"]) - } - if got["sdpMLineIndex"] != uint16(1) { - t.Fatalf("sdpMLineIndex = %v, want 1", got["sdpMLineIndex"]) - } - if got["usernameFragment"] != "ufrag-1" { - t.Fatalf("usernameFragment = %v, want ufrag-1", got["usernameFragment"]) - } - if got["target"] != "PUBLISHER" { - t.Fatalf("target = %v, want PUBLISHER", got["target"]) - } -} - -func assertJazzTrackAddEnvelope(t *testing.T, msg map[string]any) { - t.Helper() - - if msg[keyRoomID] != testJazzRoomID { - t.Fatalf("roomId = %v, want %s", msg[keyRoomID], testJazzRoomID) - } - if msg[keyEvent] != eventMediaIn { - t.Fatalf("event = %v, want %s", msg[keyEvent], eventMediaIn) - } - if msg[keyGroupID] != testJazzGroupID { - t.Fatalf("%s = %v, want %s", keyGroupID, msg[keyGroupID], testJazzGroupID) - } -} - -func assertJazzTrackAddPayload(t *testing.T, raw any) { - t.Helper() - - payload, ok := raw.(map[string]any) - if !ok { - t.Fatalf("payload missing or wrong type: %+v", raw) - } - if payload[payloadMethod] != "rtc:track:add" { - t.Fatalf("%s = %v, want rtc:track:add", payloadMethod, payload[payloadMethod]) - } - - track, ok := payload[payloadTrack].(map[string]any) - if !ok { - t.Fatalf("track missing or wrong type: %+v", payload[payloadTrack]) - } - if track[payloadType] != "VIDEO" { - t.Fatalf("%s = %v, want VIDEO", payloadType, track[payloadType]) - } - if track["source"] != "CAMERA" { - t.Fatalf("source = %v, want CAMERA", track["source"]) - } - if track[payloadMuted] != false { - t.Fatalf("muted = %v, want false", track[payloadMuted]) - } -} - -func assertJazzTrackMutedPayload(t *testing.T, raw any) { - t.Helper() - - payload, ok := raw.(map[string]any) - if !ok { - t.Fatalf("payload missing or wrong type: %+v", raw) - } - if payload[payloadMethod] != "rtc:track:muted" { - t.Fatalf("%s = %v, want rtc:track:muted", payloadMethod, payload[payloadMethod]) - } - - mute, ok := payload["mute"].(map[string]any) - if !ok { - t.Fatalf("mute missing or wrong type: %+v", payload["mute"]) - } - if mute["sid"] != "TR_CAMERA_1" { - t.Fatalf("sid = %v, want TR_CAMERA_1", mute["sid"]) - } - if mute[payloadMuted] != false { - t.Fatalf("muted = %v, want false", mute[payloadMuted]) - } -} diff --git a/mobile/mobile.go b/mobile/mobile.go index 0eb62f9..10a8678 100644 --- a/mobile/mobile.go +++ b/mobile/mobile.go @@ -55,8 +55,6 @@ const ( defaultDNSServer = "1.1.1.1:53" defaultHTTPPingURL = "https://www.google.com/generate_204" carrierWBStream = "wbstream" - carrierJazz = "jazz" - roomURLAny = "any" ) const ( @@ -165,7 +163,7 @@ func SetDebug(enabled bool) { } // Start launches the olcRTC client in background. -// carrierName: carrier name ("telemost", "jazz", "wbstream") +// carrierName: carrier name ("telemost", "wbstream", "jitsi") // roomID: carrier-specific room ID // clientID: client identifier that must match the server's -client-id // keyHex: 64-char hex encryption key @@ -746,7 +744,7 @@ func validateStartArgs(carrierName, roomID, clientID, keyHex string) error { switch { case carrierName == "": return errCarrierRequired - case roomID == "" && carrierName != carrierJazz: + case roomID == "": return errRoomIDRequired case clientID == "": return errClientIDRequired @@ -761,11 +759,6 @@ func buildRoomURL(carrierName, roomID string) string { switch carrierName { case "telemost": return "https://telemost.yandex.ru/j/" + roomID - case carrierJazz: - if roomID == "" { - return roomURLAny - } - return roomID case carrierWBStream: return roomID default: diff --git a/mobile/mobile_test.go b/mobile/mobile_test.go index 75c4810..74cfdc8 100644 --- a/mobile/mobile_test.go +++ b/mobile/mobile_test.go @@ -122,16 +122,13 @@ func TestNormalizeBuildRoomAndClamp(t *testing.T) { } } - if normalizeCarrier(carrierWBStream) != carrierWBStream || normalizeCarrier("jazz") != "jazz" { + if normalizeCarrier(carrierWBStream) != carrierWBStream || normalizeCarrier("jitsi") != "jitsi" { t.Fatal("normalizeCarrier() returned unexpected value") } if got := buildRoomURL("telemost", "abc"); got != "https://telemost.yandex.ru/j/abc" { t.Fatalf("telemost room URL = %q", got) } - if got := buildRoomURL("jazz", ""); got != "any" { - t.Fatalf("jazz empty room URL = %q", got) - } if got := buildRoomURL(carrierWBStream, "room"); got != "room" { t.Fatalf("wbstream room URL = %q", got) } @@ -150,17 +147,17 @@ func TestStartValidation(t *testing.T) { if err := startWithConfig("telemost", dataTransport, "", "client", "key", 1080, "", "", mobileConfig{}); !errors.Is(err, errRoomIDRequired) { //nolint:lll // long test description t.Fatalf("startWithConfig(missing room) = %v", err) } - if err := startWithConfig("jazz", dataTransport, "", "", "key", 1080, "", "", mobileConfig{}); !errors.Is(err, errClientIDRequired) { //nolint:lll // long test description + if err := startWithConfig("jitsi", dataTransport, "room", "", "key", 1080, "", "", mobileConfig{}); !errors.Is(err, errClientIDRequired) { //nolint:lll // long test description t.Fatalf("startWithConfig(missing client) = %v", err) } - if err := startWithConfig("jazz", dataTransport, "", "client", "", 1080, "", "", mobileConfig{}); !errors.Is(err, errKeyHexRequired) { //nolint:lll // long test description + if err := startWithConfig("jitsi", dataTransport, "room", "client", "", 1080, "", "", mobileConfig{}); !errors.Is(err, errKeyHexRequired) { //nolint:lll // long test description t.Fatalf("startWithConfig(missing key) = %v", err) } mu.Lock() cancel = func() {} mu.Unlock() - if err := startWithConfig("jazz", dataTransport, "", "client", "key", 1080, "", "", mobileConfig{}); !errors.Is(err, errAlreadyRunning) { //nolint:lll // long test description + if err := startWithConfig("jitsi", dataTransport, "room", "client", "key", 1080, "", "", mobileConfig{}); !errors.Is(err, errAlreadyRunning) { //nolint:lll // long test description t.Fatalf("startWithConfig(running) = %v", err) } resetMobileGlobals(t) @@ -176,8 +173,8 @@ func TestStartWithInjectedRunnerLifecycle(t *testing.T) { runClientWithReady = func(ctx context.Context, cfg client.Config, onReady func()) error { opts, _ := cfg.TransportOptions.(vp8channel.Options) - if cfg.Transport != dataTransport || cfg.Carrier != carrierJazz || - cfg.RoomURL != "any" || cfg.DeviceID != "client" || cfg.LocalAddr != "127.0.0.1:1080" || + if cfg.Transport != dataTransport || cfg.Carrier != "jitsi" || + cfg.RoomURL != "room" || cfg.DeviceID != "client" || cfg.LocalAddr != "127.0.0.1:1080" || cfg.DNSServer != defaultDNSServer || opts.FPS != 60 || opts.BatchSize != 8 || cfg.Liveness.Interval != 2500*time.Millisecond || cfg.Liveness.Timeout != 750*time.Millisecond || @@ -194,7 +191,7 @@ func TestStartWithInjectedRunnerLifecycle(t *testing.T) { return ctx.Err() } - if err := StartWithTransport(carrierJazz, "dc", "", "client", "key", 1080, "", ""); err != nil { + if err := StartWithTransport("jitsi", "dc", "room", "client", "key", 1080, "", ""); err != nil { t.Fatalf("StartWithTransport() error = %v", err) } if !IsRunning() { @@ -252,7 +249,7 @@ func TestStartUsesDefaultsAndCheckWithInjectedRunner(t *testing.T) { <-ctx.Done() return nil } - elapsed, err := Check("jazz", "dc", "", "client", "key", 1082, 100, -1, 999) + elapsed, err := Check("jitsi", "dc", "room", "client", "key", 1082, 100, -1, 999) if err != nil { t.Fatalf("Check() error = %v", err) } @@ -276,7 +273,7 @@ func TestPingPassesLiveness(t *testing.T) { return nil } - _, _ = Ping("jazz", "dc", "", "client", "key", 1085, 100, "http://127.0.0.1/", 30, 1) + _, _ = Ping("jitsi", "dc", "room", "client", "key", 1085, 100, "http://127.0.0.1/", 30, 1) select { case got := <-seen: if got.Interval != 4000*time.Millisecond || got.Timeout != 1500*time.Millisecond || got.Failures != 6 { diff --git a/pkg/olcrtc/olcrtc.go b/pkg/olcrtc/olcrtc.go index f118515..eea88eb 100644 --- a/pkg/olcrtc/olcrtc.go +++ b/pkg/olcrtc/olcrtc.go @@ -11,7 +11,7 @@ // conn, err := sess.Dial(ctx) // blocks until WebRTC data channel is ready // // conn implements net.Conn — pass it to sing-box / any io.ReadWriter consumer // -// Built-in auth providers (jitsi, telemost, jazz, wbstream): +// Built-in auth providers (jitsi, telemost, wbstream): // // sess, err := olcrtc.New(ctx, olcrtc.Config{ // Auth: "jitsi", @@ -52,13 +52,13 @@ var ( // Config is the input to [New]. type Config struct { // --- built-in auth mode --- - // Auth is the name of a registered auth provider ("jitsi", "telemost", "jazz", "wbstream"). + // Auth is the name of a registered auth provider ("jitsi", "telemost", "wbstream"). // When set, RoomID is forwarded to the provider as the room reference. Auth string RoomID string // --- direct engine mode (Auth == "") --- - // Engine selects the SFU protocol ("livekit", "goolom", "salutejazz"). + // Engine selects the SFU protocol ("livekit", "goolom", "jitsi"). // Defaults to "livekit" when Auth is empty. Engine string URL string @@ -77,9 +77,9 @@ type Config struct { // Session is the library handle returned by [New]. // Call [Session.Dial] to connect and obtain a [net.Conn]. type Session struct { - inner engine.Session - pr *io.PipeReader - pw *io.PipeWriter + inner engine.Session + pr *io.PipeReader + pw *io.PipeWriter authProvider auth.Provider authCfg auth.Config } @@ -241,7 +241,7 @@ func (s *Session) SetShouldReconnect(fn func() bool) { // CreateRoom creates a new room via the auth provider and returns the room ID. // Only works when the session was created with Auth set to a provider that -// supports room creation (wbstream, jazz). Returns [ErrRoomCreationUnsupported] +// supports room creation (wbstream). Returns [ErrRoomCreationUnsupported] // for providers that don't support it (e.g. telemost). func CreateRoom(ctx context.Context, authName string) (string, error) { p, err := auth.Get(authName) diff --git a/pkg/olcrtc/tunnel/tunnel.go b/pkg/olcrtc/tunnel/tunnel.go index 9690ce4..9b060c2 100644 --- a/pkg/olcrtc/tunnel/tunnel.go +++ b/pkg/olcrtc/tunnel/tunnel.go @@ -29,7 +29,7 @@ // } // // Call [RegisterDefaults] once at program start to register the built-in -// carriers (jitsi, telemost, jazz, wbstream) and transports (datachannel, +// carriers (jitsi, telemost, wbstream) and transports (datachannel, // videochannel, seichannel, vp8channel). package tunnel @@ -72,11 +72,11 @@ type TrafficFunc = server.TrafficFunc type Config struct { // --- carrier selection --- Transport string // datachannel, videochannel, seichannel, vp8channel - Carrier string // jitsi, telemost, jazz, wbstream, none + Carrier string // jitsi, telemost, wbstream, none RoomURL string // conference room identifier for the carrier // --- direct engine mode (Carrier == "none") --- - Engine string // livekit, goolom, salutejazz, jitsi + Engine string // livekit, goolom, jitsi URL string Token string diff --git a/script/cnc.sh b/script/cnc.sh index ee9a7ef..c77691e 100755 --- a/script/cnc.sh +++ b/script/cnc.sh @@ -85,18 +85,14 @@ validate_key() { echo "Select auth provider:" echo " 1) jitsi" echo " 2) telemost" -echo " 3) jazz" -echo " 4) wbstream" -read -p "Enter choice [1-4, default: 1]: " AUTH_CHOICE +echo " 3) wbstream" +read -p "Enter choice [1-3, default: 1]: " AUTH_CHOICE case "$AUTH_CHOICE" in 2) AUTH="telemost" ;; 3) - AUTH="jazz" - ;; - 4) AUTH="wbstream" ;; *) diff --git a/script/docker/olcrtc-entrypoint.sh b/script/docker/olcrtc-entrypoint.sh index a8588d9..989df5a 100644 --- a/script/docker/olcrtc-entrypoint.sh +++ b/script/docker/olcrtc-entrypoint.sh @@ -55,7 +55,7 @@ case "$mode" in srv|cnc) ;; *) die "set OLCRTC_MODE to srv or cnc" ;; esac -[ -n "$carrier" ] || die "set OLCRTC_CARRIER (e.g. jitsi, telemost, jazz, wbstream)" +[ -n "$carrier" ] || die "set OLCRTC_CARRIER (e.g. jitsi, telemost, wbstream)" [ -n "$transport" ] || die "set OLCRTC_TRANSPORT (e.g. datachannel, videochannel, seichannel, vp8channel)" make_key() { @@ -67,30 +67,7 @@ make_key() { } if [ -z "$room_id" ]; then - case "$carrier" in - jazz) - [ "$mode" = "srv" ] || die "set OLCRTC_ROOM_ID to the server room identifier" - echo "olcrtc-entrypoint: OLCRTC_ROOM_ID not set, generating room..." >&2 - gen_config="/tmp/olcrtc-gen.yaml" - cat > "$gen_config" <&2 - rm -f "$gen_config" - ;; - *) - die "set OLCRTC_ROOM_ID to the room identifier" - ;; - esac + die "set OLCRTC_ROOM_ID to the room identifier" fi if [ -z "$key" ]; then diff --git a/script/srv.sh b/script/srv.sh index 20f3b90..4ed95b3 100755 --- a/script/srv.sh +++ b/script/srv.sh @@ -81,18 +81,14 @@ validate_key() { echo "Select carrier:" echo " 1) jitsi" echo " 2) telemost" -echo " 3) jazz" -echo " 4) wbstream" -read -p "Enter choice [1-4, default: 1]: " CARRIER_CHOICE +echo " 3) wbstream" +read -p "Enter choice [1-3, default: 1]: " CARRIER_CHOICE case "$CARRIER_CHOICE" in 2) CARRIER="telemost" ;; 3) - CARRIER="jazz" - ;; - 4) CARRIER="wbstream" ;; *) @@ -130,27 +126,7 @@ echo "" GEN_ROOM=0 -if [ "$CARRIER" = "jazz" ]; then - echo "Room options:" - echo " 1) Auto-generate new room (recommended)" - echo " 2) Use specific room ID" - read -p "Enter choice [1-2, default: 1]: " ROOM_CHOICE - - case "$ROOM_CHOICE" in - 2) - read -p "Enter Room ID: " ROOM_ID - if [ -z "$ROOM_ID" ]; then - echo "[X] Room ID cannot be empty" - exit 1 - fi - ;; - *) - GEN_ROOM=1 - ROOM_ID="" - echo "[*] Will generate room before starting server" - ;; - esac -elif [ "$CARRIER" = "jitsi" ]; then +if [ "$CARRIER" = "jitsi" ]; then read -p "Jitsi base URL [default: https://meet.small-dm.ru/]: " JITSI_BASE_INPUT JITSI_BASE_URL=${JITSI_BASE_INPUT:-https://meet.small-dm.ru/} JITSI_BASE_URL="${JITSI_BASE_URL%/}"