refactor: migrate jazz to engine/salutejazz + auth/salutejazz

Split the SaluteJazz provider along the same engine/auth seam used for
WB Stream:
- internal/engine/salutejazz — Sber WS+SDP signaling engine (pub/sub
  split, _reliable data channel, length-prefixed DataPacket envelope).
  Consumes URL/Token/Extra[password] from engine.Config; no embedded
  HTTP/auth logic. Registered as engine "salutejazz".
- internal/auth/salutejazz — create-meeting + preconnect flow.
  Implements auth.Provider (Engine() → "salutejazz") and
  auth.RoomCreator. Accepts cfg.RoomURL in "<roomID>:<password>" form
  for join, or empty / "any" / "dummy" for create-on-the-fly, matching
  the legacy provider.

The carrier name "jazz" now goes through registerEngineAuth.
engine.Config gains an Extra map so auth providers can pass engine-
specific fields (password here); engine_adapter forwards
auth.Credentials.Extra into it.

session.Gen for jazz uses the auth.RoomCreator capability. Output now
includes the password ("<roomID>:<password>") — without it the printed
room is not joinable, so the legacy roomID-only output was effectively
broken for the gen flow.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
zarazaex69
2026-05-11 03:35:51 +03:00
parent 071106a674
commit d48eb565f5
15 changed files with 909 additions and 1290 deletions

View File

@@ -9,6 +9,7 @@ import (
"time"
"github.com/openlibrecommunity/olcrtc/internal/auth"
authSaluteJazz "github.com/openlibrecommunity/olcrtc/internal/auth/salutejazz"
authWBStream "github.com/openlibrecommunity/olcrtc/internal/auth/wbstream"
"github.com/openlibrecommunity/olcrtc/internal/carrier"
"github.com/openlibrecommunity/olcrtc/internal/carrier/builtin"
@@ -16,7 +17,6 @@ import (
"github.com/openlibrecommunity/olcrtc/internal/link"
"github.com/openlibrecommunity/olcrtc/internal/link/direct"
"github.com/openlibrecommunity/olcrtc/internal/names"
"github.com/openlibrecommunity/olcrtc/internal/provider/jazz"
"github.com/openlibrecommunity/olcrtc/internal/server"
"github.com/openlibrecommunity/olcrtc/internal/transport"
"github.com/openlibrecommunity/olcrtc/internal/transport/datachannel"
@@ -449,14 +449,18 @@ func genRetry(ctx context.Context, fn func(context.Context) error) error {
func Gen(ctx context.Context, cfg Config, out func(string)) error {
switch cfg.Carrier {
case carrierJazz:
creator, ok := any(authSaluteJazz.Provider{}).(auth.RoomCreator)
if !ok {
return fmt.Errorf("%w: jazz auth provider does not implement RoomCreator", ErrUnsupportedCarrier)
}
for i := range cfg.Amount {
var roomID string
err := genRetry(ctx, func(ctx context.Context) error {
info, err := jazz.CreateRoom(ctx)
var err error
roomID, err = creator.CreateRoom(ctx, auth.Config{Name: names.Generate()})
if err != nil {
return fmt.Errorf("jazz.CreateRoom: %w", err)
return fmt.Errorf("jazz CreateRoom: %w", err)
}
roomID = info.RoomID
return nil
})
if err != nil {

View File

@@ -1,5 +1,7 @@
// Package jazz implements the SaluteJazz WebRTC provider.
package jazz
// Package salutejazz is the auth provider for the SaluteJazz service. It
// creates / joins a Jazz room over HTTP and returns the connector
// WebSocket URL, room ID and password that the salutejazz engine consumes.
package salutejazz
import (
"bytes"
@@ -20,13 +22,13 @@ const (
contentTypeJSON = "application/json"
)
var apiBase = "https://bk.salutejazz.ru" //nolint:gochecknoglobals // package-level state intentional
var apiBase = "https://bk.salutejazz.ru" //nolint:gochecknoglobals // overridable base URL for tests
// RoomInfo contains connection details for a SaluteJazz room.
type RoomInfo struct {
RoomID string `json:"roomId"`
Password string `json:"password"`
ConnectorURL string `json:"connectorUrl"`
// roomInfo contains connection details for a SaluteJazz room.
type roomInfo struct {
RoomID string
Password string
ConnectorURL string
}
var (
@@ -34,14 +36,17 @@ var (
errPreconnectFailed = errors.New("preconnect failed")
)
func createRoom(ctx context.Context) (*RoomInfo, error) {
clientID := uuid.New().String()
headers := map[string]string{
"X-Jazz-ClientId": clientID,
func anonymousHeaders() map[string]string {
return map[string]string{
"X-Jazz-ClientId": uuid.New().String(),
headerAuthType: authTypeAnonymous,
"X-Client-AuthType": authTypeAnonymous,
headerContentType: contentTypeJSON,
}
}
func createRoom(ctx context.Context) (*roomInfo, error) {
headers := anonymousHeaders()
createResp, err := createMeeting(ctx, headers)
if err != nil {
@@ -53,18 +58,13 @@ func createRoom(ctx context.Context) (*RoomInfo, error) {
return nil, fmt.Errorf("preconnect: %w", err)
}
return &RoomInfo{
return &roomInfo{
RoomID: createResp.RoomID,
Password: createResp.Password,
ConnectorURL: connectorURL,
}, nil
}
// CreateRoom creates a SaluteJazz room and returns connection details for another peer to join.
func CreateRoom(ctx context.Context) (*RoomInfo, error) {
return createRoom(ctx)
}
type createResponse struct {
RoomID string `json:"roomId"`
Password string `json:"password"`
@@ -113,7 +113,6 @@ func createMeeting(ctx context.Context, headers map[string]string) (*createRespo
if err := json.NewDecoder(resp.Body).Decode(&res); err != nil {
return nil, fmt.Errorf("decode create response: %w", err)
}
return &res, nil
}
@@ -168,25 +167,16 @@ func preconnect(ctx context.Context, roomID, password string, headers map[string
if err := json.NewDecoder(preResp.Body).Decode(&preconnectResp); err != nil {
return "", fmt.Errorf("decode preconnect response: %w", err)
}
return preconnectResp.ConnectorURL, nil
}
func joinRoom(ctx context.Context, roomID, password string) (*RoomInfo, error) {
clientID := uuid.New().String()
headers := map[string]string{
"X-Jazz-ClientId": clientID,
"X-Jazz-AuthType": authTypeAnonymous,
"X-Client-AuthType": authTypeAnonymous,
"Content-Type": "application/json",
}
func joinRoom(ctx context.Context, roomID, password string) (*roomInfo, error) {
headers := anonymousHeaders()
connectorURL, err := preconnect(ctx, roomID, password, headers)
if err != nil {
return nil, err
}
return &RoomInfo{
return &roomInfo{
RoomID: roomID,
Password: password,
ConnectorURL: connectorURL,

View File

@@ -0,0 +1,67 @@
package salutejazz
import (
"context"
"fmt"
"strings"
"github.com/openlibrecommunity/olcrtc/internal/auth"
)
// Provider produces SaluteJazz credentials.
type Provider struct{}
// Engine reports which engine consumes credentials from this auth provider.
func (Provider) Engine() string { return "salutejazz" }
// Issue runs the SaluteJazz API flow and returns engine credentials.
//
// cfg.RoomURL accepts either an empty value (a new room is created on the
// fly, mirroring the legacy jazz provider) or "<roomID>:<password>".
func (Provider) Issue(ctx context.Context, cfg auth.Config) (auth.Credentials, error) {
roomRef := strings.TrimSpace(cfg.RoomURL)
var info *roomInfo
var err error
switch roomRef {
case "", "any", "dummy":
info, err = createRoom(ctx)
if err != nil {
return auth.Credentials{}, fmt.Errorf("create room: %w", err)
}
default:
roomID, password, hasPassword := strings.Cut(roomRef, ":")
if !hasPassword {
return auth.Credentials{}, fmt.Errorf("%w: expected <roomID>:<password>", auth.ErrRoomIDRequired)
}
info, err = joinRoom(ctx, roomID, password)
if err != nil {
return auth.Credentials{}, fmt.Errorf("join room: %w", err)
}
}
return auth.Credentials{
URL: info.ConnectorURL,
Token: info.RoomID,
Extra: map[string]string{
"password": info.Password,
"roomID": info.RoomID,
},
}, nil
}
// CreateRoom creates a new SaluteJazz room and returns "<roomID>:<password>".
//
// Returned format mirrors the legacy gen-mode output so existing
// subscriptions and tooling keep working.
func (Provider) CreateRoom(ctx context.Context, _ auth.Config) (string, error) {
info, err := createRoom(ctx)
if err != nil {
return "", fmt.Errorf("create room: %w", err)
}
return info.RoomID + ":" + info.Password, nil
}
func init() { //nolint:gochecknoinits // auth registration is the canonical Go pattern for plugins
auth.Register("salutejazz", Provider{})
}

View File

@@ -30,6 +30,7 @@ func registerEngineAuth(carrierName string, authProvider auth.Provider) {
URL: creds.URL,
Token: creds.Token,
Name: cfg.Name,
Extra: creds.Extra,
OnData: cfg.OnData,
DNSServer: cfg.DNSServer,
ProxyAddr: cfg.ProxyAddr,

View File

@@ -4,11 +4,12 @@ package builtin
import (
"context"
authSaluteJazz "github.com/openlibrecommunity/olcrtc/internal/auth/salutejazz"
authWBStream "github.com/openlibrecommunity/olcrtc/internal/auth/wbstream"
"github.com/openlibrecommunity/olcrtc/internal/carrier"
_ "github.com/openlibrecommunity/olcrtc/internal/engine/livekit" // engine registration via init
_ "github.com/openlibrecommunity/olcrtc/internal/engine/livekit" // engine registration via init
_ "github.com/openlibrecommunity/olcrtc/internal/engine/salutejazz" // engine registration via init
"github.com/openlibrecommunity/olcrtc/internal/provider"
"github.com/openlibrecommunity/olcrtc/internal/provider/jazz"
"github.com/openlibrecommunity/olcrtc/internal/provider/telemost"
)
@@ -17,12 +18,11 @@ type providerFactory func(context.Context, provider.Config) (provider.Provider,
// Register wires the built-in carriers into the carrier registry.
func Register() {
// Legacy provider-based carriers (still being migrated to engine+auth).
registerProvider("jazz", jazz.New)
registerProvider("telemost", telemost.New)
// Migrated to engine+auth: WB Stream now goes through the LiveKit engine
// with the wbstream auth provider.
// Migrated to engine+auth.
registerEngineAuth("wbstream", authWBStream.Provider{})
registerEngineAuth("jazz", authSaluteJazz.Provider{})
}
func registerProvider(name string, factory providerFactory) {

View File

@@ -18,11 +18,11 @@ import (
"github.com/openlibrecommunity/olcrtc/internal/app/session"
"github.com/openlibrecommunity/olcrtc/internal/auth"
authSaluteJazz "github.com/openlibrecommunity/olcrtc/internal/auth/salutejazz"
authWBStream "github.com/openlibrecommunity/olcrtc/internal/auth/wbstream"
"github.com/openlibrecommunity/olcrtc/internal/carrier"
"github.com/openlibrecommunity/olcrtc/internal/client"
"github.com/openlibrecommunity/olcrtc/internal/link"
"github.com/openlibrecommunity/olcrtc/internal/provider/jazz"
"github.com/openlibrecommunity/olcrtc/internal/server"
"github.com/pion/webrtc/v4"
)
@@ -358,11 +358,11 @@ func realRoomURL(ctx context.Context, t *testing.T, carrierName string) string {
if *realE2EJazzRoom != "" {
return *realE2EJazzRoom
}
room, err := jazz.CreateRoom(ctx)
room, err := authSaluteJazz.Provider{}.CreateRoom(ctx, auth.Config{Name: "olcrtc-e2e-room"})
if err != nil {
t.Fatalf("create real jazz room: %v", err)
}
return room.RoomID + ":" + room.Password
return room
case "telemost":
room := *realE2ETelemostRoom
if room != "" && !strings.HasPrefix(room, "http://") && !strings.HasPrefix(room, "https://") {

View File

@@ -32,10 +32,13 @@ type Capabilities struct {
// Config is the runtime input to an engine factory. URL/Token are produced by
// an auth provider (or supplied directly by the caller for "none" auth).
// Extra carries engine-specific fields that don't fit the common shape
// (e.g. SaluteJazz needs a separate room password alongside the room ID).
type Config struct {
URL string
Token string
Name string
Extra map[string]string
OnData func([]byte)
DNSServer string
ProxyAddr string

View File

@@ -1,5 +1,4 @@
// Package jazz implements the SaluteJazz WebRTC provider.
package jazz
package salutejazz
import (
"encoding/binary"

View File

@@ -0,0 +1,800 @@
// Package salutejazz implements an engine.Session backed by the SaluteJazz
// signaling protocol (WS + SDP with publisher/subscriber peer connection
// split). The on-wire protocol is Sber-specific; the media plane is
// straightforward WebRTC. Token acquisition lives in the auth package.
package salutejazz
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/openlibrecommunity/olcrtc/internal/engine"
"github.com/openlibrecommunity/olcrtc/internal/logger"
"github.com/openlibrecommunity/olcrtc/internal/protect"
"github.com/pion/webrtc/v4"
)
const (
maxDataChannelMessageSize = 12288
sendDelay = 2 * time.Millisecond
keyRoomID = "roomId"
keyEvent = "event"
keyRequestID = "requestId"
keyPayload = "payload"
credentialKeyPassword = "password"
defaultSendQueueSize = 5000
mediaReadyTimeout = 30 * time.Second
dataChannelTimeout = 30 * time.Second
wsReadTimeout = 60 * time.Second
wsHandshakeTimeout = 15 * time.Second
sendQueueTimeout = 50 * time.Millisecond
closeWaitTimeout = 2 * time.Second
subscriberOfferGap = 300 * time.Millisecond
)
var (
// ErrPublisherNotInitialized is returned when the publisher peer connection is not set up.
ErrPublisherNotInitialized = errors.New("publisher peer connection not initialized")
// ErrSubscriberMediaTimeout is returned when the subscriber media is not ready in time.
ErrSubscriberMediaTimeout = errors.New("subscriber media timeout")
// ErrDataChannelTimeout is returned when the data channel fails to open in time.
ErrDataChannelTimeout = errors.New("datachannel timeout")
// ErrDataChannelNotReady is returned when send is called before the data channel is open.
ErrDataChannelNotReady = errors.New("datachannel not ready")
// ErrSendQueueClosed is returned when send is called after Close.
ErrSendQueueClosed = errors.New("send queue closed")
// ErrSendQueueTimeout is returned when the send queue cannot accept new data in time.
ErrSendQueueTimeout = errors.New("send queue timeout")
// ErrURLRequired is returned when no connector URL was supplied.
ErrURLRequired = errors.New("salutejazz connector URL required")
// ErrRoomIDRequired is returned when no room ID was supplied.
ErrRoomIDRequired = errors.New("salutejazz room ID required")
)
// Session is the SaluteJazz engine handle.
type Session struct {
name string
connectorURL string
roomID string
password string
ws *websocket.Conn
wsMu sync.Mutex
pcSub *webrtc.PeerConnection
pcPub *webrtc.PeerConnection
dc *webrtc.DataChannel
onData func([]byte)
onReconnect func(*webrtc.DataChannel)
shouldReconnect func() bool
reconnectCh chan struct{}
closeCh chan struct{}
closed atomic.Bool
reconnecting atomic.Bool
sendQueue chan []byte
sendQueueClosed atomic.Bool
onEnded func(string)
sessionCloseCh chan struct{}
videoTrackMu sync.RWMutex
videoTracks []webrtc.TrackLocal
onVideoTrack func(*webrtc.TrackRemote, *webrtc.RTPReceiver)
subscriberReady atomic.Bool
publisherReady atomic.Bool
subscriberConn chan struct{}
publisherConn chan struct{}
wg sync.WaitGroup
groupID string
}
// New creates a new SaluteJazz engine session.
//
// cfg.URL is the SaluteJazz connector WebSocket URL. cfg.Token carries the
// room ID; cfg.Extra["password"] carries the room password. These are
// produced by the salutejazz auth provider.
func New(_ context.Context, cfg engine.Config) (engine.Session, error) {
if cfg.URL == "" {
return nil, ErrURLRequired
}
// Token field encodes the room ID for this engine.
roomID := cfg.Token
if roomID == "" {
return nil, ErrRoomIDRequired
}
password := ""
if cfg.Extra != nil {
password = cfg.Extra[credentialKeyPassword]
}
return &Session{
name: cfg.Name,
connectorURL: cfg.URL,
roomID: roomID,
password: password,
onData: cfg.OnData,
reconnectCh: make(chan struct{}, 1),
closeCh: make(chan struct{}),
sessionCloseCh: make(chan struct{}),
sendQueue: make(chan []byte, defaultSendQueueSize),
subscriberConn: make(chan struct{}),
publisherConn: make(chan struct{}),
}, nil
}
// Capabilities reports what this engine can do.
func (s *Session) Capabilities() engine.Capabilities {
return engine.Capabilities{ByteStream: true, VideoTrack: true}
}
func (s *Session) resetMediaState() {
s.subscriberReady.Store(false)
s.publisherReady.Store(false)
s.subscriberConn = make(chan struct{})
s.publisherConn = make(chan struct{})
}
func closeSignal(ch chan struct{}) {
select {
case <-ch:
default:
close(ch)
}
}
func (s *Session) hasLocalVideoTracks() bool {
s.videoTrackMu.RLock()
defer s.videoTrackMu.RUnlock()
return len(s.videoTracks) > 0
}
func (s *Session) videoTrackHandler() func(*webrtc.TrackRemote, *webrtc.RTPReceiver) {
s.videoTrackMu.RLock()
defer s.videoTrackMu.RUnlock()
return s.onVideoTrack
}
func (s *Session) attachPendingVideoTracks() error {
s.videoTrackMu.RLock()
defer s.videoTrackMu.RUnlock()
for _, track := range s.videoTracks {
if _, err := s.pcPub.AddTrack(track); err != nil {
return fmt.Errorf("failed to add track: %w", err)
}
}
return nil
}
func defaultWebRTCConfig() webrtc.Configuration {
return webrtc.Configuration{
ICEServers: []webrtc.ICEServer{},
SDPSemantics: webrtc.SDPSemanticsUnifiedPlan,
BundlePolicy: webrtc.BundlePolicyMaxBundle,
}
}
func (s *Session) buildAPI() *webrtc.API {
se := webrtc.SettingEngine{}
if protect.Protector != nil {
se.SetICEProxyDialer(protect.NewProxyDialer())
}
return webrtc.NewAPI(webrtc.WithSettingEngine(se))
}
func (s *Session) createPeerConnections(api *webrtc.API, config webrtc.Configuration) error {
var err error
s.pcSub, err = api.NewPeerConnection(config)
if err != nil {
return fmt.Errorf("create subscriber pc: %w", err)
}
s.pcSub.OnConnectionStateChange(s.onSubscriberConnectionStateChange)
s.pcSub.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
if track.Kind() != webrtc.RTPCodecTypeVideo {
return
}
if cb := s.videoTrackHandler(); cb != nil {
cb(track, receiver)
}
})
s.pcPub, err = api.NewPeerConnection(config)
if err != nil {
return fmt.Errorf("create publisher pc: %w", err)
}
s.pcPub.OnConnectionStateChange(s.onPublisherConnectionStateChange)
return nil
}
func (s *Session) createDataChannel() (chan struct{}, error) {
var err error
s.dc, err = s.pcPub.CreateDataChannel("_reliable", &webrtc.DataChannelInit{
Ordered: func() *bool { v := true; return &v }(),
})
if err != nil {
return nil, fmt.Errorf("create datachannel: %w", err)
}
dcReady := make(chan struct{})
s.setupDataChannelHandlers(dcReady)
return dcReady, nil
}
func (s *Session) waitForReady(ctx context.Context, dcReady chan struct{}) error {
if dcReady != nil {
select {
case <-dcReady:
return nil
case <-time.After(dataChannelTimeout):
return ErrDataChannelTimeout
case <-ctx.Done():
return fmt.Errorf("connect canceled: %w", ctx.Err())
}
}
return s.waitForMediaReady(ctx, mediaReadyTimeout)
}
// Connect starts the WebRTC connection process.
func (s *Session) Connect(ctx context.Context) error {
s.closed.Store(false)
s.resetMediaState()
api := s.buildAPI()
config := defaultWebRTCConfig()
if err := s.createPeerConnections(api, config); err != nil {
return err
}
if err := s.attachPendingVideoTracks(); err != nil {
return err
}
var dcReady chan struct{}
if s.onData != nil {
var err error
dcReady, err = s.createDataChannel()
if err != nil {
return err
}
}
if err := s.dialWebSocket(); err != nil {
return err
}
if err := s.sendJoin(); err != nil {
return err
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.handleSignaling(ctx)
}()
return s.waitForReady(ctx, dcReady)
}
func (s *Session) waitForMediaReady(ctx context.Context, timeout time.Duration) error {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case <-s.subscriberConn:
case <-timer.C:
return ErrSubscriberMediaTimeout
case <-ctx.Done():
return fmt.Errorf("connect cancelled: %w", ctx.Err())
}
return nil
}
func (s *Session) dialWebSocket() error {
wsDialer := websocket.Dialer{
NetDialContext: protect.DialContext,
HandshakeTimeout: wsHandshakeTimeout,
}
ws, resp, err := wsDialer.Dial(s.connectorURL, nil)
if err != nil {
return fmt.Errorf("dial websocket: %w", err)
}
if resp != nil && resp.Body != nil {
_ = resp.Body.Close()
}
s.ws = ws
ws.SetPongHandler(func(string) error {
_ = ws.SetReadDeadline(time.Now().Add(wsReadTimeout))
return nil
})
_ = ws.SetReadDeadline(time.Now().Add(wsReadTimeout))
return nil
}
func (s *Session) sendJoin() error {
joinMsg := map[string]any{
keyRoomID: s.roomID,
keyEvent: "join",
keyRequestID: uuid.New().String(),
keyPayload: map[string]any{
"password": s.password,
"participantName": s.name,
"supportedFeatures": map[string]any{
"attachedRooms": true,
"sessionGroups": true,
"transcription": true,
},
"isSilent": false,
},
}
s.wsMu.Lock()
defer s.wsMu.Unlock()
if err := s.ws.WriteJSON(joinMsg); err != nil {
return fmt.Errorf("write join json: %w", err)
}
return nil
}
func (s *Session) setupDataChannelHandlers(dcReady chan struct{}) {
s.dc.OnOpen(func() {
logger.Verbosef("[salutejazz] Publisher DC opened: %s", s.dc.Label())
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.processSendQueue()
}()
close(dcReady)
})
s.dc.OnClose(func() {
logger.Verbosef("[salutejazz] Publisher DC closed")
if !s.closed.Load() {
s.queueReconnect()
}
})
s.dc.OnMessage(func(msg webrtc.DataChannelMessage) {
s.handleIncomingMessage(msg.Data, "publisher")
})
s.pcSub.OnDataChannel(func(dc *webrtc.DataChannel) {
logger.Verbosef("[salutejazz] Received subscriber DataChannel: %s", dc.Label())
if dc.Label() != "_reliable" {
return
}
if s.onData != nil {
dc.OnMessage(func(msg webrtc.DataChannelMessage) {
s.handleIncomingMessage(msg.Data, "subscriber")
})
}
})
}
func (s *Session) onSubscriberConnectionStateChange(state webrtc.PeerConnectionState) {
switch state {
case webrtc.PeerConnectionStateConnected:
s.subscriberReady.Store(true)
closeSignal(s.subscriberConn)
case webrtc.PeerConnectionStateDisconnected, webrtc.PeerConnectionStateFailed:
s.subscriberReady.Store(false)
if !s.closed.Load() {
s.queueReconnect()
}
case webrtc.PeerConnectionStateClosed:
s.subscriberReady.Store(false)
case webrtc.PeerConnectionStateUnknown,
webrtc.PeerConnectionStateNew,
webrtc.PeerConnectionStateConnecting:
}
}
func (s *Session) onPublisherConnectionStateChange(state webrtc.PeerConnectionState) {
switch state {
case webrtc.PeerConnectionStateConnected:
s.publisherReady.Store(true)
closeSignal(s.publisherConn)
case webrtc.PeerConnectionStateDisconnected, webrtc.PeerConnectionStateFailed:
s.publisherReady.Store(false)
if !s.closed.Load() {
s.queueReconnect()
}
case webrtc.PeerConnectionStateClosed:
s.publisherReady.Store(false)
case webrtc.PeerConnectionStateUnknown,
webrtc.PeerConnectionStateNew,
webrtc.PeerConnectionStateConnecting:
}
}
func (s *Session) handleIncomingMessage(data []byte, source string) {
logger.Verbosef("[salutejazz] Received %d bytes on %s DC (raw)", len(data), source)
payload, ok := DecodeDataPacket(data)
if !ok {
logger.Debugf("[salutejazz] Failed to decode DataPacket, trying raw")
if s.onData != nil && len(data) > 0 {
s.onData(data)
}
return
}
logger.Verbosef("[salutejazz] Decoded DataPacket: %d bytes payload", len(payload))
if s.onData != nil && len(payload) > 0 {
s.onData(payload)
}
}
func (s *Session) handleSignaling(_ context.Context) {
for {
var msg map[string]any
if err := s.ws.ReadJSON(&msg); err != nil {
if !s.closed.Load() {
logger.Debugf("ws read error: %v", err)
s.queueReconnect()
}
return
}
s.updateWSDeadline()
event, _ := msg[keyEvent].(string)
payload, _ := msg[keyPayload].(map[string]any)
switch event {
case "join-response":
s.handleJoinResponse(payload)
case "media-out":
s.handleMediaOut(payload)
}
}
}
func (s *Session) handleJoinResponse(payload map[string]any) {
group, _ := payload["participantGroup"].(map[string]any)
s.groupID, _ = group["groupId"].(string)
logger.Verbosef("[salutejazz] peer joined: groupId=%s", s.groupID)
}
func (s *Session) handleMediaOut(payload map[string]any) {
method, _ := payload["method"].(string)
switch method {
case "rtc:config":
s.handleRTCConfig(payload)
case "rtc:join":
logger.Verbosef("[salutejazz] rtc:join received")
case "rtc:offer":
s.handleSubscriberOffer(payload)
case "rtc:answer":
s.handlePublisherAnswer(payload)
case "rtc:ice":
s.handleICE(payload)
}
}
func (s *Session) handleRTCConfig(payload map[string]any) {
config, _ := payload["configuration"].(map[string]any)
servers, _ := config["iceServers"].([]any)
var iceServers []webrtc.ICEServer
for _, srv := range servers {
server, _ := srv.(map[string]any)
urls, _ := server["urls"].([]any)
username, _ := server["username"].(string)
credential, _ := server["credential"].(string)
var urlStrs []string
for _, u := range urls {
if urlStr, ok := u.(string); ok && urlStr != "" {
urlStrs = append(urlStrs, urlStr)
}
}
if len(urlStrs) > 0 {
iceServers = append(iceServers, webrtc.ICEServer{
URLs: urlStrs,
Username: username,
Credential: credential,
})
}
}
if len(iceServers) > 0 {
newConfig := webrtc.Configuration{
ICEServers: iceServers,
SDPSemantics: webrtc.SDPSemanticsUnifiedPlan,
BundlePolicy: webrtc.BundlePolicyMaxBundle,
}
_ = s.pcSub.SetConfiguration(newConfig)
_ = s.pcPub.SetConfiguration(newConfig)
}
}
func (s *Session) handleSubscriberOffer(payload map[string]any) {
desc, _ := payload["description"].(map[string]any)
sdp, _ := desc["sdp"].(string)
if err := s.pcSub.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: sdp,
}); err != nil {
logger.Debugf("set remote desc error: %v", err)
return
}
answer, err := s.pcSub.CreateAnswer(nil)
if err != nil {
logger.Debugf("create answer error: %v", err)
return
}
if err := s.pcSub.SetLocalDescription(answer); err != nil {
logger.Debugf("set local desc error: %v", err)
return
}
s.wsMu.Lock()
_ = s.ws.WriteJSON(map[string]any{
keyRoomID: s.roomID,
keyEvent: "media-in",
"groupId": s.groupID,
keyRequestID: uuid.New().String(),
keyPayload: map[string]any{
"method": "rtc:answer",
"description": map[string]any{
"type": "answer",
"sdp": answer.SDP,
},
},
})
s.wsMu.Unlock()
time.Sleep(subscriberOfferGap)
s.sendPublisherOffer()
}
func (s *Session) sendPublisherOffer() {
offer, err := s.pcPub.CreateOffer(nil)
if err != nil {
logger.Debugf("create pub offer error: %v", err)
return
}
if err := s.pcPub.SetLocalDescription(offer); err != nil {
logger.Debugf("set local pub desc error: %v", err)
return
}
s.wsMu.Lock()
_ = s.ws.WriteJSON(map[string]any{
keyRoomID: s.roomID,
keyEvent: "media-in",
"groupId": s.groupID,
keyRequestID: uuid.New().String(),
keyPayload: map[string]any{
"method": "rtc:offer",
"description": map[string]any{
"type": "offer",
"sdp": offer.SDP,
},
},
})
s.wsMu.Unlock()
}
func (s *Session) handlePublisherAnswer(payload map[string]any) {
desc, _ := payload["description"].(map[string]any)
sdp, _ := desc["sdp"].(string)
if err := s.pcPub.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: sdp,
}); err != nil {
logger.Debugf("set remote pub desc error: %v", err)
}
}
func (s *Session) handleICE(payload map[string]any) {
candidates, _ := payload["rtcIceCandidates"].([]any)
for _, c := range candidates {
cand, _ := c.(map[string]any)
candStr, _ := cand["candidate"].(string)
target, _ := cand["target"].(string)
sdpMid, _ := cand["sdpMid"].(string)
sdpMLineIndex, _ := cand["sdpMLineIndex"].(float64)
init := webrtc.ICECandidateInit{
Candidate: candStr,
SDPMid: &sdpMid,
SDPMLineIndex: func() *uint16 { v := uint16(sdpMLineIndex); return &v }(),
}
switch target {
case "SUBSCRIBER":
_ = s.pcSub.AddICECandidate(init)
case "PUBLISHER":
_ = s.pcPub.AddICECandidate(init)
}
}
}
func (s *Session) updateWSDeadline() {
s.wsMu.Lock()
if s.ws != nil {
_ = s.ws.SetReadDeadline(time.Now().Add(wsReadTimeout))
}
s.wsMu.Unlock()
}
// Send queues data for transmission.
func (s *Session) Send(data []byte) error {
if s.dc == nil || s.dc.ReadyState() != webrtc.DataChannelStateOpen {
return ErrDataChannelNotReady
}
if s.sendQueueClosed.Load() {
return ErrSendQueueClosed
}
select {
case s.sendQueue <- data:
return nil
case <-time.After(sendQueueTimeout):
return ErrSendQueueTimeout
}
}
func (s *Session) processSendQueue() {
for {
select {
case <-s.sessionCloseCh:
return
case <-s.closeCh:
return
case data := <-s.sendQueue:
if len(data) > maxDataChannelMessageSize {
logger.Debugf("[salutejazz] Message too large: %d bytes (max %d)", len(data), maxDataChannelMessageSize)
continue
}
encoded := EncodeDataPacket(data)
logger.Verbosef("[salutejazz] Sending %d bytes (encoded to %d bytes)", len(data), len(encoded))
if err := s.dc.Send(encoded); err != nil {
logger.Debugf("send error: %v", err)
s.queueReconnect()
return
}
time.Sleep(sendDelay)
}
}
}
// Close terminates the connection.
func (s *Session) Close() error {
s.closed.Store(true)
s.sendQueueClosed.Store(true)
close(s.closeCh)
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(closeWaitTimeout):
}
if s.dc != nil {
_ = s.dc.Close()
}
if s.pcPub != nil {
_ = s.pcPub.Close()
}
if s.pcSub != nil {
_ = s.pcSub.Close()
}
if s.ws != nil {
s.wsMu.Lock()
_ = s.ws.WriteControl(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""),
time.Now().Add(time.Second))
_ = s.ws.Close()
s.wsMu.Unlock()
}
return nil
}
// AddVideoTrack adds a video track to the publisher peer connection.
func (s *Session) AddVideoTrack(track webrtc.TrackLocal) error {
s.videoTrackMu.Lock()
s.videoTracks = append(s.videoTracks, track)
s.videoTrackMu.Unlock()
if s.pcPub == nil {
return nil
}
if _, err := s.pcPub.AddTrack(track); err != nil {
return fmt.Errorf("failed to add track: %w", err)
}
return nil
}
// SetVideoTrackHandler registers a callback for remote video tracks.
func (s *Session) SetVideoTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) {
s.videoTrackMu.Lock()
defer s.videoTrackMu.Unlock()
s.onVideoTrack = cb
}
// SetReconnectCallback sets the callback for reconnection events.
func (s *Session) SetReconnectCallback(cb func(*webrtc.DataChannel)) { s.onReconnect = cb }
// SetShouldReconnect sets the policy for reconnection.
func (s *Session) SetShouldReconnect(fn func() bool) { s.shouldReconnect = fn }
// SetEndedCallback sets the callback for connection termination.
func (s *Session) SetEndedCallback(cb func(string)) { s.onEnded = cb }
// WatchConnection monitors the connection lifecycle.
func (s *Session) WatchConnection(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-s.closeCh:
return
case <-s.reconnectCh:
}
}
}
// CanSend checks if data can be sent.
func (s *Session) CanSend() bool {
if s.onData == nil {
if s.hasLocalVideoTracks() {
return !s.closed.Load() && s.subscriberReady.Load() && s.publisherReady.Load()
}
return !s.closed.Load() && s.subscriberReady.Load()
}
if s.dc == nil || s.dc.ReadyState() != webrtc.DataChannelStateOpen {
return false
}
return len(s.sendQueue) < 4000
}
// GetSendQueue returns the transmission queue.
func (s *Session) GetSendQueue() chan []byte { return s.sendQueue }
// GetBufferedAmount returns the WebRTC buffered amount.
func (s *Session) GetBufferedAmount() uint64 {
if s.dc != nil {
return s.dc.BufferedAmount()
}
return 0
}
func (s *Session) queueReconnect() {
if s.closed.Load() || s.reconnecting.Load() {
return
}
if s.shouldReconnect != nil && !s.shouldReconnect() {
return
}
select {
case s.reconnectCh <- struct{}{}:
default:
}
}
func init() { //nolint:gochecknoinits // engine registration is the canonical Go pattern for plugins
engine.Register("salutejazz", New)
}

View File

@@ -1,142 +0,0 @@
package jazz
import (
"context"
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"strings"
"testing"
)
func withJazzAPIServer(t *testing.T, h http.Handler) {
t.Helper()
old := apiBase
srv := httptest.NewServer(h)
t.Cleanup(func() {
apiBase = old
srv.Close()
})
apiBase = srv.URL
}
//nolint:cyclop // table-driven test naturally has many branches
func TestCreateMeetingAndPreconnect(t *testing.T) {
withJazzAPIServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Header.Get(headerAuthType) != authTypeAnonymous {
t.Fatalf("missing auth header: %v", r.Header)
}
switch r.URL.Path {
case "/room/create-meeting": //nolint:goconst // test literal, repetition is intentional
if r.Method != http.MethodPost {
t.Fatalf("create method = %s", r.Method)
}
_ = json.NewEncoder(w).Encode(createResponse{RoomID: "room-1", Password: "pass"}) //nolint:gosec,lll // G117: test-only struct mirroring upstream API shape
case "/room/room-1/preconnect":
if r.Method != http.MethodPost {
t.Fatalf("preconnect method = %s", r.Method)
}
_ = json.NewEncoder(w).Encode(map[string]string{"connectorUrl": "wss://connector"}) //nolint:goconst,lll // test literal, repetition is intentional
default:
http.NotFound(w, r)
}
}))
headers := map[string]string{
headerAuthType: authTypeAnonymous,
"Content-Type": "application/json",
}
created, err := createMeeting(context.Background(), headers)
if err != nil {
t.Fatalf("createMeeting() error = %v", err)
}
if created.RoomID != "room-1" || created.Password != "pass" {
t.Fatalf("createMeeting() = %+v", created)
}
connector, err := preconnect(context.Background(), "room-1", "pass", headers)
if err != nil {
t.Fatalf("preconnect() error = %v", err)
}
if connector != "wss://connector" {
t.Fatalf("preconnect() = %q", connector)
}
}
//nolint:cyclop // table-driven test naturally has many branches
func TestCreateRoomAndJoinRoom(t *testing.T) {
withJazzAPIServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/room/create-meeting":
_ = json.NewEncoder(w).Encode(createResponse{RoomID: "new-room", Password: "new-pass"}) //nolint:goconst,gosec,lll // test literal; G117 is a false positive for test fixtures
case "/room/new-room/preconnect", "/room/existing/preconnect":
_ = json.NewEncoder(w).Encode(map[string]string{"connectorUrl": "wss://connector"})
default:
http.NotFound(w, r)
}
}))
room, err := createRoom(context.Background())
if err != nil {
t.Fatalf("createRoom() error = %v", err)
}
if room.RoomID != "new-room" || room.Password != "new-pass" || room.ConnectorURL != "wss://connector" {
t.Fatalf("createRoom() = %+v", room)
}
room, err = joinRoom(context.Background(), "existing", "secret")
if err != nil {
t.Fatalf("joinRoom() error = %v", err)
}
if room.RoomID != "existing" || room.Password != "secret" || room.ConnectorURL != "wss://connector" {
t.Fatalf("joinRoom() = %+v", room)
}
}
func TestJazzAPIErrors(t *testing.T) {
withJazzAPIServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch {
case strings.Contains(r.URL.Path, "create-meeting"):
http.Error(w, "bad", http.StatusTeapot)
default:
http.Error(w, "bad", http.StatusInternalServerError)
}
}))
if _, err := createMeeting(context.Background(), nil); !errors.Is(err, errCreateRoomFailed) {
t.Fatalf("createMeeting() error = %v, want %v", err, errCreateRoomFailed)
}
if _, err := preconnect(context.Background(), "room", "pass", nil); !errors.Is(err, errPreconnectFailed) {
t.Fatalf("preconnect() error = %v, want %v", err, errPreconnectFailed)
}
}
func TestNewPeerUsesRoomAPI(t *testing.T) {
withJazzAPIServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/room/create-meeting":
_ = json.NewEncoder(w).Encode(createResponse{RoomID: "new-room", Password: "new-pass"}) //nolint:gosec,lll // G117: test-only struct mirroring upstream API shape
case "/room/new-room/preconnect", "/room/existing/preconnect":
_ = json.NewEncoder(w).Encode(map[string]string{"connectorUrl": "wss://connector"})
default:
http.NotFound(w, r)
}
}))
created, err := NewPeer(context.Background(), "any", "peer", nil)
if err != nil {
t.Fatalf("NewPeer(create) error = %v", err)
}
if created.roomInfo.RoomID != "new-room" {
t.Fatalf("created room = %+v", created.roomInfo)
}
joined, err := NewPeer(context.Background(), "existing:secret", "peer", nil)
if err != nil {
t.Fatalf("NewPeer(join) error = %v", err)
}
if joined.roomInfo.RoomID != "existing" || joined.roomInfo.Password != "secret" {
t.Fatalf("joined room = %+v", joined.roomInfo)
}
}

View File

@@ -1,70 +0,0 @@
package jazz
import (
"bytes"
"errors"
"io"
"testing"
)
func TestDataPacketRoundTrip(t *testing.T) {
payload := []byte("hello jazz")
raw := EncodeDataPacket(payload)
got, ok := DecodeDataPacket(raw)
if !ok {
t.Fatal("DecodeDataPacket() ok = false")
}
if !bytes.Equal(got, payload) {
t.Fatalf("DecodeDataPacket() = %q, want %q", got, payload)
}
}
func TestDecodeDataPacketRejectsMalformedPackets(t *testing.T) {
tests := [][]byte{
nil,
{0xff},
encodeField(1, 0, encodeVarint(0)),
{byte(2<<3 | 2), 10, 1},
{byte(3<<3 | 7), 0},
}
for _, raw := range tests {
if payload, ok := DecodeDataPacket(raw); ok {
t.Fatalf("DecodeDataPacket(%v) = (%q, true), want false", raw, payload)
}
}
}
func TestParseFieldsSkipsSupportedNonTargetWireTypes(t *testing.T) {
data := encodeField(1, 0, encodeVarint(150))
data = append(data, encodeField(3, 1, []byte("12345678"))...)
data = append(data, encodeField(4, 5, []byte("1234"))...)
data = append(data, encodeField(2, 2, []byte("target"))...)
got, ok := parseFields(data, 2)
if !ok || string(got) != "target" {
t.Fatalf("parseFields() = (%q, %v), want target", got, ok)
}
}
func TestByteReader(t *testing.T) {
r := &byteReader{data: []byte{1, 2, 3}}
b, err := r.ReadByte()
if err != nil || b != 1 {
t.Fatalf("ReadByte() = (%d, %v), want (1, nil)", b, err)
}
buf := make([]byte, 4)
n, err := r.Read(buf)
if err != nil || n != 2 || !bytes.Equal(buf[:n], []byte{2, 3}) {
t.Fatalf("Read() = (%d, %v, %v), want two bytes", n, err, buf[:n])
}
if _, err := r.ReadByte(); !errors.Is(err, io.EOF) {
t.Fatalf("ReadByte() error = %v, want EOF", err)
}
if n, err := r.Read(buf); !errors.Is(err, io.EOF) || n != 0 {
t.Fatalf("Read() = (%d, %v), want (0, EOF)", n, err)
}
}

View File

@@ -1,785 +0,0 @@
// Package jazz implements the SaluteJazz WebRTC provider.
package jazz
import (
"context"
"errors"
"fmt"
"log"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/openlibrecommunity/olcrtc/internal/logger"
"github.com/openlibrecommunity/olcrtc/internal/protect"
"github.com/openlibrecommunity/olcrtc/internal/provider"
"github.com/pion/webrtc/v4"
)
const (
maxDataChannelMessageSize = 12288
sendDelay = 2 * time.Millisecond
keyRoomID = "roomId"
keyEvent = "event"
keyRequestID = "requestId"
keyPayload = "payload"
)
var (
// ErrPublisherNotInitialized is returned when the publisher peer connection is not set up.
ErrPublisherNotInitialized = errors.New("publisher peer connection not initialized")
// ErrSubscriberMediaTimeout is returned when the subscriber media is not ready within the timeout period.
ErrSubscriberMediaTimeout = errors.New("subscriber media timeout")
)
// Peer represents a SaluteJazz WebRTC connection.
type Peer struct {
name string
roomInfo *RoomInfo
ws *websocket.Conn
wsMu sync.Mutex
pcSub *webrtc.PeerConnection
pcPub *webrtc.PeerConnection
dc *webrtc.DataChannel
onData func([]byte)
onReconnect func(*webrtc.DataChannel)
shouldReconnect func() bool
reconnectCh chan struct{}
closeCh chan struct{}
closed atomic.Bool
reconnecting atomic.Bool
sendQueue chan []byte
sendQueueClosed atomic.Bool
onEnded func(string)
sessionCloseCh chan struct{}
videoTrackMu sync.RWMutex
videoTracks []webrtc.TrackLocal
onVideoTrack func(*webrtc.TrackRemote, *webrtc.RTPReceiver)
subscriberReady atomic.Bool
publisherReady atomic.Bool
subscriberConn chan struct{}
publisherConn chan struct{}
wg sync.WaitGroup
groupID string
}
// NewPeer creates a new Jazz provider peer.
func NewPeer(ctx context.Context, roomID, name string, onData func([]byte)) (*Peer, error) {
var roomInfo *RoomInfo
var err error
if roomID == "" || roomID == "any" || roomID == "dummy" {
roomInfo, err = createRoom(ctx)
if err != nil {
return nil, fmt.Errorf("create room: %w", err)
}
log.Printf("Jazz room created: %s:%s", roomInfo.RoomID, roomInfo.Password)
log.Printf("To connect client use: -id \"%s:%s\"", roomInfo.RoomID, roomInfo.Password)
} else {
var password string
parts := strings.Split(roomID, ":")
if len(parts) == 2 {
roomID = parts[0]
password = parts[1]
}
roomInfo, err = joinRoom(ctx, roomID, password)
if err != nil {
return nil, fmt.Errorf("join room: %w", err)
}
log.Printf("Jazz joining room: %s", roomInfo.RoomID)
}
return &Peer{
name: name,
roomInfo: roomInfo,
onData: onData,
reconnectCh: make(chan struct{}, 1),
closeCh: make(chan struct{}),
sessionCloseCh: make(chan struct{}),
sendQueue: make(chan []byte, 5000),
subscriberConn: make(chan struct{}),
publisherConn: make(chan struct{}),
}, nil
}
func (p *Peer) resetMediaState() {
p.subscriberReady.Store(false)
p.publisherReady.Store(false)
p.subscriberConn = make(chan struct{})
p.publisherConn = make(chan struct{})
}
func closeSignal(ch chan struct{}) {
select {
case <-ch:
default:
close(ch)
}
}
func (p *Peer) hasLocalVideoTracks() bool {
p.videoTrackMu.RLock()
defer p.videoTrackMu.RUnlock()
return len(p.videoTracks) > 0
}
func (p *Peer) videoTrackHandler() func(*webrtc.TrackRemote, *webrtc.RTPReceiver) {
p.videoTrackMu.RLock()
defer p.videoTrackMu.RUnlock()
return p.onVideoTrack
}
func (p *Peer) attachPendingVideoTracks() error {
p.videoTrackMu.RLock()
defer p.videoTrackMu.RUnlock()
for _, track := range p.videoTracks {
if _, err := p.pcPub.AddTrack(track); err != nil {
return fmt.Errorf("failed to add track: %w", err)
}
}
return nil
}
func defaultWebRTCConfig() webrtc.Configuration {
return webrtc.Configuration{
ICEServers: []webrtc.ICEServer{},
SDPSemantics: webrtc.SDPSemanticsUnifiedPlan,
BundlePolicy: webrtc.BundlePolicyMaxBundle,
}
}
func (p *Peer) buildAPI() *webrtc.API {
se := webrtc.SettingEngine{}
if protect.Protector != nil {
se.SetICEProxyDialer(protect.NewProxyDialer())
}
return webrtc.NewAPI(webrtc.WithSettingEngine(se))
}
func (p *Peer) createPeerConnections(api *webrtc.API, config webrtc.Configuration) error {
var err error
p.pcSub, err = api.NewPeerConnection(config)
if err != nil {
return fmt.Errorf("create subscriber pc: %w", err)
}
p.pcSub.OnConnectionStateChange(p.onSubscriberConnectionStateChange)
p.pcSub.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
if track.Kind() != webrtc.RTPCodecTypeVideo {
return
}
if cb := p.videoTrackHandler(); cb != nil {
cb(track, receiver)
}
})
p.pcPub, err = api.NewPeerConnection(config)
if err != nil {
return fmt.Errorf("create publisher pc: %w", err)
}
p.pcPub.OnConnectionStateChange(p.onPublisherConnectionStateChange)
return nil
}
func (p *Peer) createDataChannel() (chan struct{}, error) {
var err error
p.dc, err = p.pcPub.CreateDataChannel("_reliable", &webrtc.DataChannelInit{
Ordered: func() *bool { v := true; return &v }(),
})
if err != nil {
return nil, fmt.Errorf("create datachannel: %w", err)
}
dcReady := make(chan struct{})
p.setupDataChannelHandlers(dcReady)
return dcReady, nil
}
func (p *Peer) waitForReady(ctx context.Context, dcReady chan struct{}) error {
if dcReady != nil {
select {
case <-dcReady:
return nil
case <-time.After(30 * time.Second):
return provider.ErrDataChannelTimeout
case <-ctx.Done():
return fmt.Errorf("connect canceled: %w", ctx.Err())
}
}
return p.waitForMediaReady(ctx, 30*time.Second)
}
// Connect starts the WebRTC connection process.
func (p *Peer) Connect(ctx context.Context) error {
p.closed.Store(false)
p.resetMediaState()
api := p.buildAPI()
config := defaultWebRTCConfig()
if err := p.createPeerConnections(api, config); err != nil {
return err
}
if err := p.attachPendingVideoTracks(); err != nil {
return err
}
var dcReady chan struct{}
if p.onData != nil {
var err error
dcReady, err = p.createDataChannel()
if err != nil {
return err
}
}
if err := p.dialWebSocket(); err != nil {
return err
}
if err := p.sendJoin(); err != nil {
return err
}
p.wg.Add(1)
go func() {
defer p.wg.Done()
p.handleSignaling(ctx)
}()
return p.waitForReady(ctx, dcReady)
}
func (p *Peer) waitForMediaReady(ctx context.Context, timeout time.Duration) error {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case <-p.subscriberConn:
case <-timer.C:
return ErrSubscriberMediaTimeout
case <-ctx.Done():
return fmt.Errorf("connect cancelled: %w", ctx.Err())
}
return nil
}
func (p *Peer) dialWebSocket() error {
wsDialer := websocket.Dialer{
NetDialContext: protect.DialContext,
HandshakeTimeout: 15 * time.Second,
}
ws, resp, err := wsDialer.Dial(p.roomInfo.ConnectorURL, nil)
if err != nil {
return fmt.Errorf("dial websocket: %w", err)
}
if resp != nil && resp.Body != nil {
_ = resp.Body.Close()
}
p.ws = ws
ws.SetPongHandler(func(string) error {
_ = ws.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
_ = ws.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
}
func (p *Peer) sendJoin() error {
joinMsg := map[string]any{
keyRoomID: p.roomInfo.RoomID,
keyEvent: "join",
keyRequestID: uuid.New().String(),
keyPayload: map[string]any{
"password": p.roomInfo.Password,
"participantName": p.name,
"supportedFeatures": map[string]any{
"attachedRooms": true,
"sessionGroups": true,
"transcription": true,
},
"isSilent": false,
},
}
p.wsMu.Lock()
defer p.wsMu.Unlock()
if err := p.ws.WriteJSON(joinMsg); err != nil {
return fmt.Errorf("write join json: %w", err)
}
return nil
}
func (p *Peer) setupDataChannelHandlers(dcReady chan struct{}) {
p.dc.OnOpen(func() {
logger.Verbosef("[Jazz] Publisher DC opened: %s", p.dc.Label())
p.wg.Add(1)
go func() {
defer p.wg.Done()
p.processSendQueue()
}()
close(dcReady)
})
p.dc.OnClose(func() {
logger.Verbosef("[Jazz] Publisher DC closed")
if !p.closed.Load() {
p.queueReconnect()
}
})
p.dc.OnMessage(func(msg webrtc.DataChannelMessage) {
p.handleIncomingMessage(msg.Data, "publisher")
})
p.pcSub.OnDataChannel(func(dc *webrtc.DataChannel) {
logger.Verbosef("[Jazz] Received subscriber DataChannel: %s", dc.Label())
if dc.Label() != "_reliable" {
return
}
if p.onData != nil {
dc.OnMessage(func(msg webrtc.DataChannelMessage) {
p.handleIncomingMessage(msg.Data, "subscriber")
})
}
})
}
func (p *Peer) onSubscriberConnectionStateChange(state webrtc.PeerConnectionState) {
switch state {
case webrtc.PeerConnectionStateConnected:
p.subscriberReady.Store(true)
closeSignal(p.subscriberConn)
case webrtc.PeerConnectionStateDisconnected, webrtc.PeerConnectionStateFailed:
p.subscriberReady.Store(false)
if !p.closed.Load() {
p.queueReconnect()
}
case webrtc.PeerConnectionStateClosed:
p.subscriberReady.Store(false)
case webrtc.PeerConnectionStateUnknown,
webrtc.PeerConnectionStateNew,
webrtc.PeerConnectionStateConnecting:
}
}
func (p *Peer) onPublisherConnectionStateChange(state webrtc.PeerConnectionState) {
switch state {
case webrtc.PeerConnectionStateConnected:
p.publisherReady.Store(true)
closeSignal(p.publisherConn)
case webrtc.PeerConnectionStateDisconnected, webrtc.PeerConnectionStateFailed:
p.publisherReady.Store(false)
if !p.closed.Load() {
p.queueReconnect()
}
case webrtc.PeerConnectionStateClosed:
p.publisherReady.Store(false)
case webrtc.PeerConnectionStateUnknown,
webrtc.PeerConnectionStateNew,
webrtc.PeerConnectionStateConnecting:
}
}
func (p *Peer) handleIncomingMessage(data []byte, source string) {
logger.Verbosef("[Jazz] Received %d bytes on %s DC (raw)", len(data), source)
payload, ok := DecodeDataPacket(data)
if !ok {
logger.Debugf("[Jazz] Failed to decode DataPacket, trying raw")
if p.onData != nil && len(data) > 0 {
p.onData(data)
}
return
}
logger.Verbosef("[Jazz] Decoded DataPacket: %d bytes payload", len(payload))
if p.onData != nil && len(payload) > 0 {
p.onData(payload)
}
}
func (p *Peer) handleSignaling(_ context.Context) {
for {
var msg map[string]any
if err := p.ws.ReadJSON(&msg); err != nil {
if !p.closed.Load() {
logger.Debugf("ws read error: %v", err)
p.queueReconnect()
}
return
}
p.updateWSDeadline()
event, _ := msg[keyEvent].(string)
payload, _ := msg[keyPayload].(map[string]any)
switch event {
case "join-response":
p.handleJoinResponse(payload)
case "media-out":
p.handleMediaOut(payload)
}
}
}
func (p *Peer) handleJoinResponse(payload map[string]any) {
group, _ := payload["participantGroup"].(map[string]any)
p.groupID, _ = group["groupId"].(string)
logger.Verbosef("Jazz peer joined: groupId=%s", p.groupID)
}
func (p *Peer) handleMediaOut(payload map[string]any) {
method, _ := payload["method"].(string)
switch method {
case "rtc:config":
p.handleRTCConfig(payload)
case "rtc:join":
logger.Verbosef("Jazz rtc:join received")
case "rtc:offer":
p.handleSubscriberOffer(payload)
case "rtc:answer":
p.handlePublisherAnswer(payload)
case "rtc:ice":
p.handleICE(payload)
}
}
func (p *Peer) handleRTCConfig(payload map[string]any) {
config, _ := payload["configuration"].(map[string]any)
servers, _ := config["iceServers"].([]any)
var iceServers []webrtc.ICEServer
for _, s := range servers {
server, _ := s.(map[string]any)
urls, _ := server["urls"].([]any)
username, _ := server["username"].(string)
credential, _ := server["credential"].(string)
var urlStrs []string
for _, u := range urls {
if urlStr, ok := u.(string); ok && urlStr != "" {
urlStrs = append(urlStrs, urlStr)
}
}
if len(urlStrs) > 0 {
iceServers = append(iceServers, webrtc.ICEServer{
URLs: urlStrs,
Username: username,
Credential: credential,
})
}
}
if len(iceServers) > 0 {
newConfig := webrtc.Configuration{
ICEServers: iceServers,
SDPSemantics: webrtc.SDPSemanticsUnifiedPlan,
BundlePolicy: webrtc.BundlePolicyMaxBundle,
}
_ = p.pcSub.SetConfiguration(newConfig)
_ = p.pcPub.SetConfiguration(newConfig)
}
}
func (p *Peer) handleSubscriberOffer(payload map[string]any) {
desc, _ := payload["description"].(map[string]any)
sdp, _ := desc["sdp"].(string)
if err := p.pcSub.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: sdp,
}); err != nil {
logger.Debugf("set remote desc error: %v", err)
return
}
answer, err := p.pcSub.CreateAnswer(nil)
if err != nil {
logger.Debugf("create answer error: %v", err)
return
}
if err := p.pcSub.SetLocalDescription(answer); err != nil {
logger.Debugf("set local desc error: %v", err)
return
}
p.wsMu.Lock()
_ = p.ws.WriteJSON(map[string]any{
keyRoomID: p.roomInfo.RoomID,
keyEvent: "media-in",
"groupId": p.groupID,
keyRequestID: uuid.New().String(),
keyPayload: map[string]any{
"method": "rtc:answer",
"description": map[string]any{
"type": "answer",
"sdp": answer.SDP,
},
},
})
p.wsMu.Unlock()
time.Sleep(300 * time.Millisecond)
p.sendPublisherOffer()
}
func (p *Peer) sendPublisherOffer() {
offer, err := p.pcPub.CreateOffer(nil)
if err != nil {
logger.Debugf("create pub offer error: %v", err)
return
}
if err := p.pcPub.SetLocalDescription(offer); err != nil {
logger.Debugf("set local pub desc error: %v", err)
return
}
p.wsMu.Lock()
_ = p.ws.WriteJSON(map[string]any{
keyRoomID: p.roomInfo.RoomID,
keyEvent: "media-in",
"groupId": p.groupID,
keyRequestID: uuid.New().String(),
keyPayload: map[string]any{
"method": "rtc:offer",
"description": map[string]any{
"type": "offer",
"sdp": offer.SDP,
},
},
})
p.wsMu.Unlock()
}
func (p *Peer) handlePublisherAnswer(payload map[string]any) {
desc, _ := payload["description"].(map[string]any)
sdp, _ := desc["sdp"].(string)
if err := p.pcPub.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: sdp,
}); err != nil {
logger.Debugf("set remote pub desc error: %v", err)
}
}
func (p *Peer) handleICE(payload map[string]any) {
candidates, _ := payload["rtcIceCandidates"].([]any)
for _, c := range candidates {
cand, _ := c.(map[string]any)
candStr, _ := cand["candidate"].(string)
target, _ := cand["target"].(string)
sdpMid, _ := cand["sdpMid"].(string)
sdpMLineIndex, _ := cand["sdpMLineIndex"].(float64)
init := webrtc.ICECandidateInit{
Candidate: candStr,
SDPMid: &sdpMid,
SDPMLineIndex: func() *uint16 { v := uint16(sdpMLineIndex); return &v }(),
}
switch target {
case "SUBSCRIBER":
_ = p.pcSub.AddICECandidate(init)
case "PUBLISHER":
_ = p.pcPub.AddICECandidate(init)
}
}
}
func (p *Peer) updateWSDeadline() {
p.wsMu.Lock()
if p.ws != nil {
_ = p.ws.SetReadDeadline(time.Now().Add(60 * time.Second))
}
p.wsMu.Unlock()
}
// Send queues data for transmission.
func (p *Peer) Send(data []byte) error {
if p.dc == nil || p.dc.ReadyState() != webrtc.DataChannelStateOpen {
return provider.ErrDataChannelNotReady
}
if p.sendQueueClosed.Load() {
return provider.ErrSendQueueClosed
}
select {
case p.sendQueue <- data:
return nil
case <-time.After(50 * time.Millisecond):
return provider.ErrSendQueueTimeout
}
}
func (p *Peer) processSendQueue() {
for {
select {
case <-p.sessionCloseCh:
return
case <-p.closeCh:
return
case data := <-p.sendQueue:
if len(data) > maxDataChannelMessageSize {
logger.Debugf("[Jazz] Message too large: %d bytes (max %d)", len(data), maxDataChannelMessageSize)
continue
}
encoded := EncodeDataPacket(data)
logger.Verbosef("[Jazz] Sending %d bytes (encoded to %d bytes)", len(data), len(encoded))
if err := p.dc.Send(encoded); err != nil {
logger.Debugf("send error: %v", err)
p.queueReconnect()
return
}
time.Sleep(sendDelay)
}
}
}
// Close terminates the connection and releases resources.
func (p *Peer) Close() error {
p.closed.Store(true)
p.sendQueueClosed.Store(true)
close(p.closeCh)
done := make(chan struct{})
go func() {
p.wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(2 * time.Second):
}
if p.dc != nil {
_ = p.dc.Close()
}
if p.pcPub != nil {
_ = p.pcPub.Close()
}
if p.pcSub != nil {
_ = p.pcSub.Close()
}
if p.ws != nil {
p.wsMu.Lock()
_ = p.ws.WriteControl(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""),
time.Now().Add(time.Second))
_ = p.ws.Close()
p.wsMu.Unlock()
}
return nil
}
// AddVideoTrack adds a video track to the publisher peer connection.
func (p *Peer) AddVideoTrack(track webrtc.TrackLocal) error {
p.videoTrackMu.Lock()
p.videoTracks = append(p.videoTracks, track)
p.videoTrackMu.Unlock()
if p.pcPub == nil {
return nil
}
if _, err := p.pcPub.AddTrack(track); err != nil {
return fmt.Errorf("failed to add track: %w", err)
}
return nil
}
// SetVideoTrackHandler registers a callback for remote video tracks.
func (p *Peer) SetVideoTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) {
p.videoTrackMu.Lock()
defer p.videoTrackMu.Unlock()
p.onVideoTrack = cb
}
// SetReconnectCallback sets the callback for reconnection events.
func (p *Peer) SetReconnectCallback(cb func(*webrtc.DataChannel)) {
p.onReconnect = cb
}
// SetShouldReconnect sets the policy for reconnection.
func (p *Peer) SetShouldReconnect(fn func() bool) {
p.shouldReconnect = fn
}
// SetEndedCallback sets the callback for connection termination.
func (p *Peer) SetEndedCallback(cb func(string)) {
p.onEnded = cb
}
// WatchConnection monitors the connection lifecycle.
func (p *Peer) WatchConnection(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-p.closeCh:
return
case <-p.reconnectCh:
}
}
}
// CanSend checks if data can be sent.
func (p *Peer) CanSend() bool {
if p.onData == nil {
if p.hasLocalVideoTracks() {
return !p.closed.Load() && p.subscriberReady.Load() && p.publisherReady.Load()
}
return !p.closed.Load() && p.subscriberReady.Load()
}
if p.dc == nil || p.dc.ReadyState() != webrtc.DataChannelStateOpen {
return false
}
return len(p.sendQueue) < 4000
}
// GetSendQueue returns the transmission queue.
func (p *Peer) GetSendQueue() chan []byte {
return p.sendQueue
}
// GetBufferedAmount returns the WebRTC buffered amount.
func (p *Peer) GetBufferedAmount() uint64 {
if p.dc != nil {
return p.dc.BufferedAmount()
}
return 0
}
func (p *Peer) queueReconnect() {
if p.closed.Load() || p.reconnecting.Load() {
return
}
if p.shouldReconnect != nil && !p.shouldReconnect() {
return
}
select {
case p.reconnectCh <- struct{}{}:
default:
}
}

View File

@@ -1,113 +0,0 @@
package jazz
import (
"context"
"errors"
"testing"
"github.com/openlibrecommunity/olcrtc/internal/provider"
"github.com/pion/webrtc/v4"
)
//nolint:cyclop // table-driven test naturally has many branches
func TestPeerStateHelpers(t *testing.T) {
p := &Peer{
reconnectCh: make(chan struct{}, 1),
closeCh: make(chan struct{}),
sessionCloseCh: make(chan struct{}),
sendQueue: make(chan []byte, 1),
subscriberConn: make(chan struct{}),
publisherConn: make(chan struct{}),
}
p.resetMediaState()
if p.subscriberReady.Load() || p.publisherReady.Load() || p.subscriberConn == nil || p.publisherConn == nil {
t.Fatal("resetMediaState() did not reset readiness")
}
if p.hasLocalVideoTracks() {
t.Fatal("hasLocalVideoTracks() = true without tracks")
}
if err := p.AddVideoTrack(nil); err != nil {
t.Fatalf("AddVideoTrack(nil) error = %v", err)
}
if !p.hasLocalVideoTracks() {
t.Fatal("hasLocalVideoTracks() = false after AddVideoTrack")
}
p.SetVideoTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver) {})
if p.videoTrackHandler() == nil {
t.Fatal("videoTrackHandler() = nil")
}
cfg := defaultWebRTCConfig()
if cfg.SDPSemantics != webrtc.SDPSemanticsUnifiedPlan || cfg.BundlePolicy != webrtc.BundlePolicyMaxBundle {
t.Fatalf("defaultWebRTCConfig() = %+v", cfg)
}
if p.buildAPI() == nil {
t.Fatal("buildAPI() returned nil")
}
}
func TestPeerCallbacksQueueReconnectAndClose(t *testing.T) {
p := &Peer{
reconnectCh: make(chan struct{}, 1),
closeCh: make(chan struct{}),
sessionCloseCh: make(chan struct{}),
sendQueue: make(chan []byte, 1),
}
p.SetReconnectCallback(func(*webrtc.DataChannel) {})
p.SetShouldReconnect(func() bool { return true })
p.SetEndedCallback(func(string) {})
if p.onReconnect == nil || p.shouldReconnect == nil || p.onEnded == nil {
t.Fatal("callbacks were not stored")
}
p.queueReconnect()
select {
case <-p.reconnectCh:
default:
t.Fatal("queueReconnect() did not enqueue")
}
p.SetShouldReconnect(func() bool { return false })
p.queueReconnect()
select {
case <-p.reconnectCh:
t.Fatal("queueReconnect() enqueued despite policy=false")
default:
}
done := make(chan struct{})
go func() {
p.WatchConnection(context.Background())
close(done)
}()
if err := p.Close(); err != nil {
t.Fatalf("Close() error = %v", err)
}
<-done
if err := p.Send([]byte("closed")); !errors.Is(err, provider.ErrDataChannelNotReady) {
t.Fatalf("Send() error = %v, want datachannel not ready", err)
}
}
func TestPeerCanSendVideoOnlyModes(t *testing.T) {
p := &Peer{sendQueue: make(chan []byte, 1)}
p.subscriberReady.Store(true)
if !p.CanSend() {
t.Fatal("CanSend() = false for subscriber-ready peer without local video")
}
_ = p.AddVideoTrack(nil)
if p.CanSend() {
t.Fatal("CanSend() = true with local video but publisher not ready")
}
p.publisherReady.Store(true)
if !p.CanSend() {
t.Fatal("CanSend() = false with subscriber and publisher ready")
}
p.closed.Store(true)
if p.CanSend() {
t.Fatal("CanSend() = true for closed peer")
}
}

View File

@@ -1,84 +0,0 @@
// Package jazz implements the SaluteJazz WebRTC provider.
package jazz
import (
"context"
"fmt"
"github.com/openlibrecommunity/olcrtc/internal/provider"
"github.com/pion/webrtc/v4"
)
type jazzProvider struct {
peer *Peer
}
// New creates a new SaluteJazz provider instance.
func New(ctx context.Context, cfg provider.Config) (provider.Provider, error) {
peer, err := NewPeer(ctx, cfg.RoomURL, cfg.Name, cfg.OnData)
if err != nil {
return nil, fmt.Errorf("create jazz peer: %w", err)
}
return &jazzProvider{peer: peer}, nil
}
// Connect starts the provider connection.
func (j *jazzProvider) Connect(ctx context.Context) error {
return j.peer.Connect(ctx)
}
// Send transmits data to the room.
func (j *jazzProvider) Send(data []byte) error {
return j.peer.Send(data)
}
// Close terminates the provider connection.
func (j *jazzProvider) Close() error {
return j.peer.Close()
}
// SetReconnectCallback sets the function to call on reconnection.
func (j *jazzProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) {
j.peer.SetReconnectCallback(cb)
}
// SetShouldReconnect sets the function to determine if reconnection should occur.
func (j *jazzProvider) SetShouldReconnect(fn func() bool) {
j.peer.SetShouldReconnect(fn)
}
// SetEndedCallback sets the function to call when the session ends.
func (j *jazzProvider) SetEndedCallback(cb func(string)) {
j.peer.SetEndedCallback(cb)
}
// WatchConnection monitors the provider connection state.
func (j *jazzProvider) WatchConnection(ctx context.Context) {
j.peer.WatchConnection(ctx)
}
// CanSend checks if the provider is ready to transmit data.
func (j *jazzProvider) CanSend() bool {
return j.peer.CanSend()
}
// GetSendQueue returns the data transmission queue.
func (j *jazzProvider) GetSendQueue() chan []byte {
return j.peer.GetSendQueue()
}
// GetBufferedAmount returns the current WebRTC buffered amount.
func (j *jazzProvider) GetBufferedAmount() uint64 {
return j.peer.GetBufferedAmount()
}
// AddVideoTrack adds a video track to the jazz connection.
func (j *jazzProvider) AddVideoTrack(track webrtc.TrackLocal) error {
return j.peer.AddVideoTrack(track)
}
// SetVideoTrackHandler registers a callback for subscribed remote video tracks.
func (j *jazzProvider) SetVideoTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) {
j.peer.SetVideoTrackHandler(cb)
}

View File

@@ -1,51 +0,0 @@
package jazz
import (
"context"
"errors"
"testing"
"github.com/openlibrecommunity/olcrtc/internal/provider"
"github.com/pion/webrtc/v4"
)
func TestJazzProviderForwardsPeerMethods(t *testing.T) {
peer := &Peer{
reconnectCh: make(chan struct{}, 1),
closeCh: make(chan struct{}),
sessionCloseCh: make(chan struct{}),
sendQueue: make(chan []byte, 1),
}
p := &jazzProvider{peer: peer}
p.SetReconnectCallback(func(*webrtc.DataChannel) {})
p.SetShouldReconnect(func() bool { return true })
p.SetEndedCallback(func(string) {})
p.SetVideoTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver) {})
if peer.onReconnect == nil || peer.shouldReconnect == nil || peer.onEnded == nil || peer.onVideoTrack == nil {
t.Fatal("callbacks were not forwarded")
}
if p.GetSendQueue() != peer.sendQueue {
t.Fatal("GetSendQueue() did not forward")
}
if p.GetBufferedAmount() != 0 {
t.Fatal("GetBufferedAmount() != 0 with nil datachannel")
}
if err := p.AddVideoTrack(nil); err != nil {
t.Fatalf("AddVideoTrack(nil) error = %v", err)
}
if err := p.Send([]byte("x")); !errors.Is(err, provider.ErrDataChannelNotReady) {
t.Fatalf("Send() error = %v, want datachannel not ready", err)
}
done := make(chan struct{})
go func() {
p.WatchConnection(context.Background())
close(done)
}()
if err := p.Close(); err != nil {
t.Fatalf("Close() error = %v", err)
}
<-done
}