feat(code): add token grant logging and message stats

This commit is contained in:
zarazaex69
2026-05-14 03:56:31 +03:00
parent 3249a109f8
commit 1897c13550

View File

@@ -2,8 +2,9 @@
"""PoC: WB Stream DataChannel over LiveKit."""
import asyncio
import base64
import json
import logging
import uuid
import requests
try:
@@ -20,6 +21,22 @@ HARDCODED_ROOM_ID = "019e23c2-a580-7550-b08a-7ac5342ca21f"
TEST_ATTEMPTS = 60
TEST_MESSAGES = [f"WB Stream DataChannel attempt {idx:02d}" for idx in range(1, TEST_ATTEMPTS + 1)]
def _decode_jwt_payload(token: str) -> dict:
"""Decode JWT payload without verifying the signature; useful for inspecting LiveKit grants."""
try:
payload = token.split(".")[1]
payload += "=" * (-len(payload) % 4)
return json.loads(base64.urlsafe_b64decode(payload))
except Exception as exc:
return {"decode_error": str(exc)}
def _print_token_grants(label: str, token: str) -> None:
payload = _decode_jwt_payload(token)
print(f" {label} token identity={payload.get('sub')} name={payload.get('name')}")
print(f" {label} video grants={json.dumps(payload.get('video', {}), ensure_ascii=False, sort_keys=True)}")
def _get_room_token(room_id: str, display_name: str) -> tuple[str, str]:
"""Retrieves the room token via the guest API."""
headers = {
@@ -47,7 +64,15 @@ def _get_room_token(room_id: str, display_name: str) -> tuple[str, str]:
async def run_poc() -> dict:
"""Runs the complete PoC flow."""
print("\n--- WB Stream PoC ---")
results = {"server_ok": False, "client_ok": False, "sent": 0, "recv": 0, "errors": []}
results = {
"server_ok": False,
"client_ok": False,
"sent": 0,
"server_recv": 0,
"echo_sent": 0,
"client_recv": 0,
"errors": [],
}
server, client = rtc.Room(), rtc.Room()
shared_room_id = HARDCODED_ROOM_ID
@@ -56,15 +81,30 @@ async def run_poc() -> dict:
try:
shared_room_id, server_tok = _get_room_token(shared_room_id, "OlcRTC-Server")
_, client_tok = _get_room_token(shared_room_id, "OlcRTC-Client")
_print_token_grants("server", server_tok)
_print_token_grants("client", client_tok)
@server.on("data_received")
def on_server_data(dp: rtc.DataPacket):
if dp.topic == "olcrtc":
asyncio.create_task(server.local_participant.publish_data(f"Echo: {dp.data.decode()}".encode(), topic="olcrtc"))
msg = dp.data.decode(errors="replace")
results["server_recv"] += 1
print(f" <- Server recv #{results['server_recv']:02d}: {msg}")
async def echo() -> None:
try:
await server.local_participant.publish_data(f"Echo: {msg}".encode(), topic="olcrtc")
results["echo_sent"] += 1
except Exception as exc:
results["errors"].append(f"Echo failed: {exc}")
asyncio.create_task(echo())
@client.on("data_received")
def on_client_data(dp: rtc.DataPacket):
results["recv"] += 1
if dp.topic == "olcrtc":
results["client_recv"] += 1
print(f" <- Client recv #{results['client_recv']:02d}: {dp.data.decode(errors='replace')}")
await server.connect(WS_URL, server_tok)
results["server_ok"] = True
@@ -98,10 +138,14 @@ async def run_poc() -> dict:
def print_results(res: dict):
print("\n--- TEST RESULTS ---")
print(f"Server: {':P' if res['server_ok'] else 'X'} / Client: {':P' if res['client_ok'] else 'X'}")
print(f"Messages: Sent {res['sent']} / Recv {res['recv']}")
print(
"Messages: "
f"Client sent {res['sent']} / Server recv {res['server_recv']} / "
f"Echo sent {res['echo_sent']} / Client recv {res['client_recv']}"
)
if res['errors']:
for e in res['errors']: print(f" Error: {e}")
print(f"\n{':P SUCCESS' if res['sent'] and res['sent'] == res['recv'] else 'X FAILED'}\n")
print(f"\n{':P SUCCESS' if res['sent'] and res['sent'] == res['client_recv'] else 'X FAILED'}\n")
if __name__ == "__main__":
try: