diff --git a/code/wbstream_poc_datachannel.py b/code/wbstream_poc_datachannel.py index 3428fc7..45cbfef 100755 --- a/code/wbstream_poc_datachannel.py +++ b/code/wbstream_poc_datachannel.py @@ -2,8 +2,9 @@ """PoC: WB Stream DataChannel over LiveKit.""" import asyncio +import base64 +import json import logging -import uuid import requests try: @@ -20,6 +21,22 @@ HARDCODED_ROOM_ID = "019e23c2-a580-7550-b08a-7ac5342ca21f" TEST_ATTEMPTS = 60 TEST_MESSAGES = [f"WB Stream DataChannel attempt {idx:02d}" for idx in range(1, TEST_ATTEMPTS + 1)] + +def _decode_jwt_payload(token: str) -> dict: + """Decode JWT payload without verifying the signature; useful for inspecting LiveKit grants.""" + try: + payload = token.split(".")[1] + payload += "=" * (-len(payload) % 4) + return json.loads(base64.urlsafe_b64decode(payload)) + except Exception as exc: + return {"decode_error": str(exc)} + + +def _print_token_grants(label: str, token: str) -> None: + payload = _decode_jwt_payload(token) + print(f" {label} token identity={payload.get('sub')} name={payload.get('name')}") + print(f" {label} video grants={json.dumps(payload.get('video', {}), ensure_ascii=False, sort_keys=True)}") + def _get_room_token(room_id: str, display_name: str) -> tuple[str, str]: """Retrieves the room token via the guest API.""" headers = { @@ -47,7 +64,15 @@ def _get_room_token(room_id: str, display_name: str) -> tuple[str, str]: async def run_poc() -> dict: """Runs the complete PoC flow.""" print("\n--- WB Stream PoC ---") - results = {"server_ok": False, "client_ok": False, "sent": 0, "recv": 0, "errors": []} + results = { + "server_ok": False, + "client_ok": False, + "sent": 0, + "server_recv": 0, + "echo_sent": 0, + "client_recv": 0, + "errors": [], + } server, client = rtc.Room(), rtc.Room() shared_room_id = HARDCODED_ROOM_ID @@ -56,15 +81,30 @@ async def run_poc() -> dict: try: shared_room_id, server_tok = _get_room_token(shared_room_id, "OlcRTC-Server") _, client_tok = _get_room_token(shared_room_id, "OlcRTC-Client") + _print_token_grants("server", server_tok) + _print_token_grants("client", client_tok) @server.on("data_received") def on_server_data(dp: rtc.DataPacket): if dp.topic == "olcrtc": - asyncio.create_task(server.local_participant.publish_data(f"Echo: {dp.data.decode()}".encode(), topic="olcrtc")) + msg = dp.data.decode(errors="replace") + results["server_recv"] += 1 + print(f" <- Server recv #{results['server_recv']:02d}: {msg}") + + async def echo() -> None: + try: + await server.local_participant.publish_data(f"Echo: {msg}".encode(), topic="olcrtc") + results["echo_sent"] += 1 + except Exception as exc: + results["errors"].append(f"Echo failed: {exc}") + + asyncio.create_task(echo()) @client.on("data_received") def on_client_data(dp: rtc.DataPacket): - results["recv"] += 1 + if dp.topic == "olcrtc": + results["client_recv"] += 1 + print(f" <- Client recv #{results['client_recv']:02d}: {dp.data.decode(errors='replace')}") await server.connect(WS_URL, server_tok) results["server_ok"] = True @@ -98,10 +138,14 @@ async def run_poc() -> dict: def print_results(res: dict): print("\n--- TEST RESULTS ---") print(f"Server: {':P' if res['server_ok'] else 'X'} / Client: {':P' if res['client_ok'] else 'X'}") - print(f"Messages: Sent {res['sent']} / Recv {res['recv']}") + print( + "Messages: " + f"Client sent {res['sent']} / Server recv {res['server_recv']} / " + f"Echo sent {res['echo_sent']} / Client recv {res['client_recv']}" + ) if res['errors']: for e in res['errors']: print(f" Error: {e}") - print(f"\n{':P SUCCESS' if res['sent'] and res['sent'] == res['recv'] else 'X FAILED'}\n") + print(f"\n{':P SUCCESS' if res['sent'] and res['sent'] == res['client_recv'] else 'X FAILED'}\n") if __name__ == "__main__": try: