diff --git a/internal/app/session/session.go b/internal/app/session/session.go index 73bb528..a72ae54 100644 --- a/internal/app/session/session.go +++ b/internal/app/session/session.go @@ -9,6 +9,7 @@ import ( "time" "github.com/openlibrecommunity/olcrtc/internal/auth" + authSaluteJazz "github.com/openlibrecommunity/olcrtc/internal/auth/salutejazz" authWBStream "github.com/openlibrecommunity/olcrtc/internal/auth/wbstream" "github.com/openlibrecommunity/olcrtc/internal/carrier" "github.com/openlibrecommunity/olcrtc/internal/carrier/builtin" @@ -16,7 +17,6 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/link" "github.com/openlibrecommunity/olcrtc/internal/link/direct" "github.com/openlibrecommunity/olcrtc/internal/names" - "github.com/openlibrecommunity/olcrtc/internal/provider/jazz" "github.com/openlibrecommunity/olcrtc/internal/server" "github.com/openlibrecommunity/olcrtc/internal/transport" "github.com/openlibrecommunity/olcrtc/internal/transport/datachannel" @@ -449,14 +449,18 @@ func genRetry(ctx context.Context, fn func(context.Context) error) error { func Gen(ctx context.Context, cfg Config, out func(string)) error { switch cfg.Carrier { case carrierJazz: + creator, ok := any(authSaluteJazz.Provider{}).(auth.RoomCreator) + if !ok { + return fmt.Errorf("%w: jazz auth provider does not implement RoomCreator", ErrUnsupportedCarrier) + } for i := range cfg.Amount { var roomID string err := genRetry(ctx, func(ctx context.Context) error { - info, err := jazz.CreateRoom(ctx) + var err error + roomID, err = creator.CreateRoom(ctx, auth.Config{Name: names.Generate()}) if err != nil { - return fmt.Errorf("jazz.CreateRoom: %w", err) + return fmt.Errorf("jazz CreateRoom: %w", err) } - roomID = info.RoomID return nil }) if err != nil { diff --git a/internal/provider/jazz/api.go b/internal/auth/salutejazz/api.go similarity index 81% rename from internal/provider/jazz/api.go rename to internal/auth/salutejazz/api.go index a4250b1..3d214ad 100644 --- a/internal/provider/jazz/api.go +++ b/internal/auth/salutejazz/api.go @@ -1,5 +1,7 @@ -// Package jazz implements the SaluteJazz WebRTC provider. -package jazz +// Package salutejazz is the auth provider for the SaluteJazz service. It +// creates / joins a Jazz room over HTTP and returns the connector +// WebSocket URL, room ID and password that the salutejazz engine consumes. +package salutejazz import ( "bytes" @@ -20,13 +22,13 @@ const ( contentTypeJSON = "application/json" ) -var apiBase = "https://bk.salutejazz.ru" //nolint:gochecknoglobals // package-level state intentional +var apiBase = "https://bk.salutejazz.ru" //nolint:gochecknoglobals // overridable base URL for tests -// RoomInfo contains connection details for a SaluteJazz room. -type RoomInfo struct { - RoomID string `json:"roomId"` - Password string `json:"password"` - ConnectorURL string `json:"connectorUrl"` +// roomInfo contains connection details for a SaluteJazz room. +type roomInfo struct { + RoomID string + Password string + ConnectorURL string } var ( @@ -34,14 +36,17 @@ var ( errPreconnectFailed = errors.New("preconnect failed") ) -func createRoom(ctx context.Context) (*RoomInfo, error) { - clientID := uuid.New().String() - headers := map[string]string{ - "X-Jazz-ClientId": clientID, +func anonymousHeaders() map[string]string { + return map[string]string{ + "X-Jazz-ClientId": uuid.New().String(), headerAuthType: authTypeAnonymous, "X-Client-AuthType": authTypeAnonymous, headerContentType: contentTypeJSON, } +} + +func createRoom(ctx context.Context) (*roomInfo, error) { + headers := anonymousHeaders() createResp, err := createMeeting(ctx, headers) if err != nil { @@ -53,18 +58,13 @@ func createRoom(ctx context.Context) (*RoomInfo, error) { return nil, fmt.Errorf("preconnect: %w", err) } - return &RoomInfo{ + return &roomInfo{ RoomID: createResp.RoomID, Password: createResp.Password, ConnectorURL: connectorURL, }, nil } -// CreateRoom creates a SaluteJazz room and returns connection details for another peer to join. -func CreateRoom(ctx context.Context) (*RoomInfo, error) { - return createRoom(ctx) -} - type createResponse struct { RoomID string `json:"roomId"` Password string `json:"password"` @@ -113,7 +113,6 @@ func createMeeting(ctx context.Context, headers map[string]string) (*createRespo if err := json.NewDecoder(resp.Body).Decode(&res); err != nil { return nil, fmt.Errorf("decode create response: %w", err) } - return &res, nil } @@ -168,25 +167,16 @@ func preconnect(ctx context.Context, roomID, password string, headers map[string if err := json.NewDecoder(preResp.Body).Decode(&preconnectResp); err != nil { return "", fmt.Errorf("decode preconnect response: %w", err) } - return preconnectResp.ConnectorURL, nil } -func joinRoom(ctx context.Context, roomID, password string) (*RoomInfo, error) { - clientID := uuid.New().String() - headers := map[string]string{ - "X-Jazz-ClientId": clientID, - "X-Jazz-AuthType": authTypeAnonymous, - "X-Client-AuthType": authTypeAnonymous, - "Content-Type": "application/json", - } - +func joinRoom(ctx context.Context, roomID, password string) (*roomInfo, error) { + headers := anonymousHeaders() connectorURL, err := preconnect(ctx, roomID, password, headers) if err != nil { return nil, err } - - return &RoomInfo{ + return &roomInfo{ RoomID: roomID, Password: password, ConnectorURL: connectorURL, diff --git a/internal/auth/salutejazz/salutejazz.go b/internal/auth/salutejazz/salutejazz.go new file mode 100644 index 0000000..d166707 --- /dev/null +++ b/internal/auth/salutejazz/salutejazz.go @@ -0,0 +1,67 @@ +package salutejazz + +import ( + "context" + "fmt" + "strings" + + "github.com/openlibrecommunity/olcrtc/internal/auth" +) + +// Provider produces SaluteJazz credentials. +type Provider struct{} + +// Engine reports which engine consumes credentials from this auth provider. +func (Provider) Engine() string { return "salutejazz" } + +// Issue runs the SaluteJazz API flow and returns engine credentials. +// +// cfg.RoomURL accepts either an empty value (a new room is created on the +// fly, mirroring the legacy jazz provider) or ":". +func (Provider) Issue(ctx context.Context, cfg auth.Config) (auth.Credentials, error) { + roomRef := strings.TrimSpace(cfg.RoomURL) + var info *roomInfo + var err error + + switch roomRef { + case "", "any", "dummy": + info, err = createRoom(ctx) + if err != nil { + return auth.Credentials{}, fmt.Errorf("create room: %w", err) + } + default: + roomID, password, hasPassword := strings.Cut(roomRef, ":") + if !hasPassword { + return auth.Credentials{}, fmt.Errorf("%w: expected :", auth.ErrRoomIDRequired) + } + info, err = joinRoom(ctx, roomID, password) + if err != nil { + return auth.Credentials{}, fmt.Errorf("join room: %w", err) + } + } + + return auth.Credentials{ + URL: info.ConnectorURL, + Token: info.RoomID, + Extra: map[string]string{ + "password": info.Password, + "roomID": info.RoomID, + }, + }, nil +} + +// CreateRoom creates a new SaluteJazz room and returns ":". +// +// Returned format mirrors the legacy gen-mode output so existing +// subscriptions and tooling keep working. +func (Provider) CreateRoom(ctx context.Context, _ auth.Config) (string, error) { + info, err := createRoom(ctx) + if err != nil { + return "", fmt.Errorf("create room: %w", err) + } + return info.RoomID + ":" + info.Password, nil +} + +func init() { //nolint:gochecknoinits // auth registration is the canonical Go pattern for plugins + auth.Register("salutejazz", Provider{}) +} diff --git a/internal/carrier/builtin/engine_adapter.go b/internal/carrier/builtin/engine_adapter.go index 6bb92e5..fe12a11 100644 --- a/internal/carrier/builtin/engine_adapter.go +++ b/internal/carrier/builtin/engine_adapter.go @@ -30,6 +30,7 @@ func registerEngineAuth(carrierName string, authProvider auth.Provider) { URL: creds.URL, Token: creds.Token, Name: cfg.Name, + Extra: creds.Extra, OnData: cfg.OnData, DNSServer: cfg.DNSServer, ProxyAddr: cfg.ProxyAddr, diff --git a/internal/carrier/builtin/register.go b/internal/carrier/builtin/register.go index 6c45fb4..0f2f7a1 100644 --- a/internal/carrier/builtin/register.go +++ b/internal/carrier/builtin/register.go @@ -4,11 +4,12 @@ package builtin import ( "context" + authSaluteJazz "github.com/openlibrecommunity/olcrtc/internal/auth/salutejazz" authWBStream "github.com/openlibrecommunity/olcrtc/internal/auth/wbstream" "github.com/openlibrecommunity/olcrtc/internal/carrier" - _ "github.com/openlibrecommunity/olcrtc/internal/engine/livekit" // engine registration via init + _ "github.com/openlibrecommunity/olcrtc/internal/engine/livekit" // engine registration via init + _ "github.com/openlibrecommunity/olcrtc/internal/engine/salutejazz" // engine registration via init "github.com/openlibrecommunity/olcrtc/internal/provider" - "github.com/openlibrecommunity/olcrtc/internal/provider/jazz" "github.com/openlibrecommunity/olcrtc/internal/provider/telemost" ) @@ -17,12 +18,11 @@ type providerFactory func(context.Context, provider.Config) (provider.Provider, // Register wires the built-in carriers into the carrier registry. func Register() { // Legacy provider-based carriers (still being migrated to engine+auth). - registerProvider("jazz", jazz.New) registerProvider("telemost", telemost.New) - // Migrated to engine+auth: WB Stream now goes through the LiveKit engine - // with the wbstream auth provider. + // Migrated to engine+auth. registerEngineAuth("wbstream", authWBStream.Provider{}) + registerEngineAuth("jazz", authSaluteJazz.Provider{}) } func registerProvider(name string, factory providerFactory) { diff --git a/internal/e2e/tunnel_test.go b/internal/e2e/tunnel_test.go index 01a8c89..3144654 100644 --- a/internal/e2e/tunnel_test.go +++ b/internal/e2e/tunnel_test.go @@ -18,11 +18,11 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/app/session" "github.com/openlibrecommunity/olcrtc/internal/auth" + authSaluteJazz "github.com/openlibrecommunity/olcrtc/internal/auth/salutejazz" authWBStream "github.com/openlibrecommunity/olcrtc/internal/auth/wbstream" "github.com/openlibrecommunity/olcrtc/internal/carrier" "github.com/openlibrecommunity/olcrtc/internal/client" "github.com/openlibrecommunity/olcrtc/internal/link" - "github.com/openlibrecommunity/olcrtc/internal/provider/jazz" "github.com/openlibrecommunity/olcrtc/internal/server" "github.com/pion/webrtc/v4" ) @@ -358,11 +358,11 @@ func realRoomURL(ctx context.Context, t *testing.T, carrierName string) string { if *realE2EJazzRoom != "" { return *realE2EJazzRoom } - room, err := jazz.CreateRoom(ctx) + room, err := authSaluteJazz.Provider{}.CreateRoom(ctx, auth.Config{Name: "olcrtc-e2e-room"}) if err != nil { t.Fatalf("create real jazz room: %v", err) } - return room.RoomID + ":" + room.Password + return room case "telemost": room := *realE2ETelemostRoom if room != "" && !strings.HasPrefix(room, "http://") && !strings.HasPrefix(room, "https://") { diff --git a/internal/engine/engine.go b/internal/engine/engine.go index 00d357f..3b037e0 100644 --- a/internal/engine/engine.go +++ b/internal/engine/engine.go @@ -32,10 +32,13 @@ type Capabilities struct { // Config is the runtime input to an engine factory. URL/Token are produced by // an auth provider (or supplied directly by the caller for "none" auth). +// Extra carries engine-specific fields that don't fit the common shape +// (e.g. SaluteJazz needs a separate room password alongside the room ID). type Config struct { URL string Token string Name string + Extra map[string]string OnData func([]byte) DNSServer string ProxyAddr string diff --git a/internal/provider/jazz/datapacket.go b/internal/engine/salutejazz/datapacket.go similarity index 97% rename from internal/provider/jazz/datapacket.go rename to internal/engine/salutejazz/datapacket.go index 7614fb8..c833b41 100644 --- a/internal/provider/jazz/datapacket.go +++ b/internal/engine/salutejazz/datapacket.go @@ -1,5 +1,4 @@ -// Package jazz implements the SaluteJazz WebRTC provider. -package jazz +package salutejazz import ( "encoding/binary" diff --git a/internal/engine/salutejazz/salutejazz.go b/internal/engine/salutejazz/salutejazz.go new file mode 100644 index 0000000..07c3a9d --- /dev/null +++ b/internal/engine/salutejazz/salutejazz.go @@ -0,0 +1,800 @@ +// Package salutejazz implements an engine.Session backed by the SaluteJazz +// signaling protocol (WS + SDP with publisher/subscriber peer connection +// split). The on-wire protocol is Sber-specific; the media plane is +// straightforward WebRTC. Token acquisition lives in the auth package. +package salutejazz + +import ( + "context" + "errors" + "fmt" + "sync" + "sync/atomic" + "time" + + "github.com/google/uuid" + "github.com/gorilla/websocket" + "github.com/openlibrecommunity/olcrtc/internal/engine" + "github.com/openlibrecommunity/olcrtc/internal/logger" + "github.com/openlibrecommunity/olcrtc/internal/protect" + "github.com/pion/webrtc/v4" +) + +const ( + maxDataChannelMessageSize = 12288 + sendDelay = 2 * time.Millisecond + + keyRoomID = "roomId" + keyEvent = "event" + keyRequestID = "requestId" + keyPayload = "payload" + + credentialKeyPassword = "password" + + defaultSendQueueSize = 5000 + mediaReadyTimeout = 30 * time.Second + dataChannelTimeout = 30 * time.Second + wsReadTimeout = 60 * time.Second + wsHandshakeTimeout = 15 * time.Second + sendQueueTimeout = 50 * time.Millisecond + closeWaitTimeout = 2 * time.Second + subscriberOfferGap = 300 * time.Millisecond +) + +var ( + // ErrPublisherNotInitialized is returned when the publisher peer connection is not set up. + ErrPublisherNotInitialized = errors.New("publisher peer connection not initialized") + // ErrSubscriberMediaTimeout is returned when the subscriber media is not ready in time. + ErrSubscriberMediaTimeout = errors.New("subscriber media timeout") + // ErrDataChannelTimeout is returned when the data channel fails to open in time. + ErrDataChannelTimeout = errors.New("datachannel timeout") + // ErrDataChannelNotReady is returned when send is called before the data channel is open. + ErrDataChannelNotReady = errors.New("datachannel not ready") + // ErrSendQueueClosed is returned when send is called after Close. + ErrSendQueueClosed = errors.New("send queue closed") + // ErrSendQueueTimeout is returned when the send queue cannot accept new data in time. + ErrSendQueueTimeout = errors.New("send queue timeout") + // ErrURLRequired is returned when no connector URL was supplied. + ErrURLRequired = errors.New("salutejazz connector URL required") + // ErrRoomIDRequired is returned when no room ID was supplied. + ErrRoomIDRequired = errors.New("salutejazz room ID required") +) + +// Session is the SaluteJazz engine handle. +type Session struct { + name string + connectorURL string + roomID string + password string + ws *websocket.Conn + wsMu sync.Mutex + pcSub *webrtc.PeerConnection + pcPub *webrtc.PeerConnection + dc *webrtc.DataChannel + onData func([]byte) + onReconnect func(*webrtc.DataChannel) + shouldReconnect func() bool + reconnectCh chan struct{} + closeCh chan struct{} + closed atomic.Bool + reconnecting atomic.Bool + sendQueue chan []byte + sendQueueClosed atomic.Bool + onEnded func(string) + sessionCloseCh chan struct{} + videoTrackMu sync.RWMutex + videoTracks []webrtc.TrackLocal + onVideoTrack func(*webrtc.TrackRemote, *webrtc.RTPReceiver) + subscriberReady atomic.Bool + publisherReady atomic.Bool + subscriberConn chan struct{} + publisherConn chan struct{} + wg sync.WaitGroup + groupID string +} + +// New creates a new SaluteJazz engine session. +// +// cfg.URL is the SaluteJazz connector WebSocket URL. cfg.Token carries the +// room ID; cfg.Extra["password"] carries the room password. These are +// produced by the salutejazz auth provider. +func New(_ context.Context, cfg engine.Config) (engine.Session, error) { + if cfg.URL == "" { + return nil, ErrURLRequired + } + // Token field encodes the room ID for this engine. + roomID := cfg.Token + if roomID == "" { + return nil, ErrRoomIDRequired + } + password := "" + if cfg.Extra != nil { + password = cfg.Extra[credentialKeyPassword] + } + + return &Session{ + name: cfg.Name, + connectorURL: cfg.URL, + roomID: roomID, + password: password, + onData: cfg.OnData, + reconnectCh: make(chan struct{}, 1), + closeCh: make(chan struct{}), + sessionCloseCh: make(chan struct{}), + sendQueue: make(chan []byte, defaultSendQueueSize), + subscriberConn: make(chan struct{}), + publisherConn: make(chan struct{}), + }, nil +} + +// Capabilities reports what this engine can do. +func (s *Session) Capabilities() engine.Capabilities { + return engine.Capabilities{ByteStream: true, VideoTrack: true} +} + +func (s *Session) resetMediaState() { + s.subscriberReady.Store(false) + s.publisherReady.Store(false) + s.subscriberConn = make(chan struct{}) + s.publisherConn = make(chan struct{}) +} + +func closeSignal(ch chan struct{}) { + select { + case <-ch: + default: + close(ch) + } +} + +func (s *Session) hasLocalVideoTracks() bool { + s.videoTrackMu.RLock() + defer s.videoTrackMu.RUnlock() + return len(s.videoTracks) > 0 +} + +func (s *Session) videoTrackHandler() func(*webrtc.TrackRemote, *webrtc.RTPReceiver) { + s.videoTrackMu.RLock() + defer s.videoTrackMu.RUnlock() + return s.onVideoTrack +} + +func (s *Session) attachPendingVideoTracks() error { + s.videoTrackMu.RLock() + defer s.videoTrackMu.RUnlock() + + for _, track := range s.videoTracks { + if _, err := s.pcPub.AddTrack(track); err != nil { + return fmt.Errorf("failed to add track: %w", err) + } + } + return nil +} + +func defaultWebRTCConfig() webrtc.Configuration { + return webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{}, + SDPSemantics: webrtc.SDPSemanticsUnifiedPlan, + BundlePolicy: webrtc.BundlePolicyMaxBundle, + } +} + +func (s *Session) buildAPI() *webrtc.API { + se := webrtc.SettingEngine{} + if protect.Protector != nil { + se.SetICEProxyDialer(protect.NewProxyDialer()) + } + return webrtc.NewAPI(webrtc.WithSettingEngine(se)) +} + +func (s *Session) createPeerConnections(api *webrtc.API, config webrtc.Configuration) error { + var err error + s.pcSub, err = api.NewPeerConnection(config) + if err != nil { + return fmt.Errorf("create subscriber pc: %w", err) + } + s.pcSub.OnConnectionStateChange(s.onSubscriberConnectionStateChange) + s.pcSub.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { + if track.Kind() != webrtc.RTPCodecTypeVideo { + return + } + if cb := s.videoTrackHandler(); cb != nil { + cb(track, receiver) + } + }) + + s.pcPub, err = api.NewPeerConnection(config) + if err != nil { + return fmt.Errorf("create publisher pc: %w", err) + } + s.pcPub.OnConnectionStateChange(s.onPublisherConnectionStateChange) + return nil +} + +func (s *Session) createDataChannel() (chan struct{}, error) { + var err error + s.dc, err = s.pcPub.CreateDataChannel("_reliable", &webrtc.DataChannelInit{ + Ordered: func() *bool { v := true; return &v }(), + }) + if err != nil { + return nil, fmt.Errorf("create datachannel: %w", err) + } + dcReady := make(chan struct{}) + s.setupDataChannelHandlers(dcReady) + return dcReady, nil +} + +func (s *Session) waitForReady(ctx context.Context, dcReady chan struct{}) error { + if dcReady != nil { + select { + case <-dcReady: + return nil + case <-time.After(dataChannelTimeout): + return ErrDataChannelTimeout + case <-ctx.Done(): + return fmt.Errorf("connect canceled: %w", ctx.Err()) + } + } + return s.waitForMediaReady(ctx, mediaReadyTimeout) +} + +// Connect starts the WebRTC connection process. +func (s *Session) Connect(ctx context.Context) error { + s.closed.Store(false) + s.resetMediaState() + + api := s.buildAPI() + config := defaultWebRTCConfig() + + if err := s.createPeerConnections(api, config); err != nil { + return err + } + if err := s.attachPendingVideoTracks(); err != nil { + return err + } + + var dcReady chan struct{} + if s.onData != nil { + var err error + dcReady, err = s.createDataChannel() + if err != nil { + return err + } + } + + if err := s.dialWebSocket(); err != nil { + return err + } + if err := s.sendJoin(); err != nil { + return err + } + + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.handleSignaling(ctx) + }() + + return s.waitForReady(ctx, dcReady) +} + +func (s *Session) waitForMediaReady(ctx context.Context, timeout time.Duration) error { + timer := time.NewTimer(timeout) + defer timer.Stop() + + select { + case <-s.subscriberConn: + case <-timer.C: + return ErrSubscriberMediaTimeout + case <-ctx.Done(): + return fmt.Errorf("connect cancelled: %w", ctx.Err()) + } + return nil +} + +func (s *Session) dialWebSocket() error { + wsDialer := websocket.Dialer{ + NetDialContext: protect.DialContext, + HandshakeTimeout: wsHandshakeTimeout, + } + + ws, resp, err := wsDialer.Dial(s.connectorURL, nil) + if err != nil { + return fmt.Errorf("dial websocket: %w", err) + } + if resp != nil && resp.Body != nil { + _ = resp.Body.Close() + } + + s.ws = ws + ws.SetPongHandler(func(string) error { + _ = ws.SetReadDeadline(time.Now().Add(wsReadTimeout)) + return nil + }) + _ = ws.SetReadDeadline(time.Now().Add(wsReadTimeout)) + return nil +} + +func (s *Session) sendJoin() error { + joinMsg := map[string]any{ + keyRoomID: s.roomID, + keyEvent: "join", + keyRequestID: uuid.New().String(), + keyPayload: map[string]any{ + "password": s.password, + "participantName": s.name, + "supportedFeatures": map[string]any{ + "attachedRooms": true, + "sessionGroups": true, + "transcription": true, + }, + "isSilent": false, + }, + } + + s.wsMu.Lock() + defer s.wsMu.Unlock() + if err := s.ws.WriteJSON(joinMsg); err != nil { + return fmt.Errorf("write join json: %w", err) + } + return nil +} + +func (s *Session) setupDataChannelHandlers(dcReady chan struct{}) { + s.dc.OnOpen(func() { + logger.Verbosef("[salutejazz] Publisher DC opened: %s", s.dc.Label()) + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.processSendQueue() + }() + close(dcReady) + }) + + s.dc.OnClose(func() { + logger.Verbosef("[salutejazz] Publisher DC closed") + if !s.closed.Load() { + s.queueReconnect() + } + }) + + s.dc.OnMessage(func(msg webrtc.DataChannelMessage) { + s.handleIncomingMessage(msg.Data, "publisher") + }) + + s.pcSub.OnDataChannel(func(dc *webrtc.DataChannel) { + logger.Verbosef("[salutejazz] Received subscriber DataChannel: %s", dc.Label()) + if dc.Label() != "_reliable" { + return + } + if s.onData != nil { + dc.OnMessage(func(msg webrtc.DataChannelMessage) { + s.handleIncomingMessage(msg.Data, "subscriber") + }) + } + }) +} + +func (s *Session) onSubscriberConnectionStateChange(state webrtc.PeerConnectionState) { + switch state { + case webrtc.PeerConnectionStateConnected: + s.subscriberReady.Store(true) + closeSignal(s.subscriberConn) + case webrtc.PeerConnectionStateDisconnected, webrtc.PeerConnectionStateFailed: + s.subscriberReady.Store(false) + if !s.closed.Load() { + s.queueReconnect() + } + case webrtc.PeerConnectionStateClosed: + s.subscriberReady.Store(false) + case webrtc.PeerConnectionStateUnknown, + webrtc.PeerConnectionStateNew, + webrtc.PeerConnectionStateConnecting: + } +} + +func (s *Session) onPublisherConnectionStateChange(state webrtc.PeerConnectionState) { + switch state { + case webrtc.PeerConnectionStateConnected: + s.publisherReady.Store(true) + closeSignal(s.publisherConn) + case webrtc.PeerConnectionStateDisconnected, webrtc.PeerConnectionStateFailed: + s.publisherReady.Store(false) + if !s.closed.Load() { + s.queueReconnect() + } + case webrtc.PeerConnectionStateClosed: + s.publisherReady.Store(false) + case webrtc.PeerConnectionStateUnknown, + webrtc.PeerConnectionStateNew, + webrtc.PeerConnectionStateConnecting: + } +} + +func (s *Session) handleIncomingMessage(data []byte, source string) { + logger.Verbosef("[salutejazz] Received %d bytes on %s DC (raw)", len(data), source) + + payload, ok := DecodeDataPacket(data) + if !ok { + logger.Debugf("[salutejazz] Failed to decode DataPacket, trying raw") + if s.onData != nil && len(data) > 0 { + s.onData(data) + } + return + } + + logger.Verbosef("[salutejazz] Decoded DataPacket: %d bytes payload", len(payload)) + if s.onData != nil && len(payload) > 0 { + s.onData(payload) + } +} + +func (s *Session) handleSignaling(_ context.Context) { + for { + var msg map[string]any + if err := s.ws.ReadJSON(&msg); err != nil { + if !s.closed.Load() { + logger.Debugf("ws read error: %v", err) + s.queueReconnect() + } + return + } + + s.updateWSDeadline() + + event, _ := msg[keyEvent].(string) + payload, _ := msg[keyPayload].(map[string]any) + + switch event { + case "join-response": + s.handleJoinResponse(payload) + case "media-out": + s.handleMediaOut(payload) + } + } +} + +func (s *Session) handleJoinResponse(payload map[string]any) { + group, _ := payload["participantGroup"].(map[string]any) + s.groupID, _ = group["groupId"].(string) + logger.Verbosef("[salutejazz] peer joined: groupId=%s", s.groupID) +} + +func (s *Session) handleMediaOut(payload map[string]any) { + method, _ := payload["method"].(string) + + switch method { + case "rtc:config": + s.handleRTCConfig(payload) + case "rtc:join": + logger.Verbosef("[salutejazz] rtc:join received") + case "rtc:offer": + s.handleSubscriberOffer(payload) + case "rtc:answer": + s.handlePublisherAnswer(payload) + case "rtc:ice": + s.handleICE(payload) + } +} + +func (s *Session) handleRTCConfig(payload map[string]any) { + config, _ := payload["configuration"].(map[string]any) + servers, _ := config["iceServers"].([]any) + + var iceServers []webrtc.ICEServer + for _, srv := range servers { + server, _ := srv.(map[string]any) + urls, _ := server["urls"].([]any) + username, _ := server["username"].(string) + credential, _ := server["credential"].(string) + + var urlStrs []string + for _, u := range urls { + if urlStr, ok := u.(string); ok && urlStr != "" { + urlStrs = append(urlStrs, urlStr) + } + } + + if len(urlStrs) > 0 { + iceServers = append(iceServers, webrtc.ICEServer{ + URLs: urlStrs, + Username: username, + Credential: credential, + }) + } + } + + if len(iceServers) > 0 { + newConfig := webrtc.Configuration{ + ICEServers: iceServers, + SDPSemantics: webrtc.SDPSemanticsUnifiedPlan, + BundlePolicy: webrtc.BundlePolicyMaxBundle, + } + _ = s.pcSub.SetConfiguration(newConfig) + _ = s.pcPub.SetConfiguration(newConfig) + } +} + +func (s *Session) handleSubscriberOffer(payload map[string]any) { + desc, _ := payload["description"].(map[string]any) + sdp, _ := desc["sdp"].(string) + + if err := s.pcSub.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeOffer, + SDP: sdp, + }); err != nil { + logger.Debugf("set remote desc error: %v", err) + return + } + + answer, err := s.pcSub.CreateAnswer(nil) + if err != nil { + logger.Debugf("create answer error: %v", err) + return + } + + if err := s.pcSub.SetLocalDescription(answer); err != nil { + logger.Debugf("set local desc error: %v", err) + return + } + + s.wsMu.Lock() + _ = s.ws.WriteJSON(map[string]any{ + keyRoomID: s.roomID, + keyEvent: "media-in", + "groupId": s.groupID, + keyRequestID: uuid.New().String(), + keyPayload: map[string]any{ + "method": "rtc:answer", + "description": map[string]any{ + "type": "answer", + "sdp": answer.SDP, + }, + }, + }) + s.wsMu.Unlock() + + time.Sleep(subscriberOfferGap) + s.sendPublisherOffer() +} + +func (s *Session) sendPublisherOffer() { + offer, err := s.pcPub.CreateOffer(nil) + if err != nil { + logger.Debugf("create pub offer error: %v", err) + return + } + + if err := s.pcPub.SetLocalDescription(offer); err != nil { + logger.Debugf("set local pub desc error: %v", err) + return + } + + s.wsMu.Lock() + _ = s.ws.WriteJSON(map[string]any{ + keyRoomID: s.roomID, + keyEvent: "media-in", + "groupId": s.groupID, + keyRequestID: uuid.New().String(), + keyPayload: map[string]any{ + "method": "rtc:offer", + "description": map[string]any{ + "type": "offer", + "sdp": offer.SDP, + }, + }, + }) + s.wsMu.Unlock() +} + +func (s *Session) handlePublisherAnswer(payload map[string]any) { + desc, _ := payload["description"].(map[string]any) + sdp, _ := desc["sdp"].(string) + + if err := s.pcPub.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, + SDP: sdp, + }); err != nil { + logger.Debugf("set remote pub desc error: %v", err) + } +} + +func (s *Session) handleICE(payload map[string]any) { + candidates, _ := payload["rtcIceCandidates"].([]any) + + for _, c := range candidates { + cand, _ := c.(map[string]any) + candStr, _ := cand["candidate"].(string) + target, _ := cand["target"].(string) + sdpMid, _ := cand["sdpMid"].(string) + sdpMLineIndex, _ := cand["sdpMLineIndex"].(float64) + + init := webrtc.ICECandidateInit{ + Candidate: candStr, + SDPMid: &sdpMid, + SDPMLineIndex: func() *uint16 { v := uint16(sdpMLineIndex); return &v }(), + } + + switch target { + case "SUBSCRIBER": + _ = s.pcSub.AddICECandidate(init) + case "PUBLISHER": + _ = s.pcPub.AddICECandidate(init) + } + } +} + +func (s *Session) updateWSDeadline() { + s.wsMu.Lock() + if s.ws != nil { + _ = s.ws.SetReadDeadline(time.Now().Add(wsReadTimeout)) + } + s.wsMu.Unlock() +} + +// Send queues data for transmission. +func (s *Session) Send(data []byte) error { + if s.dc == nil || s.dc.ReadyState() != webrtc.DataChannelStateOpen { + return ErrDataChannelNotReady + } + if s.sendQueueClosed.Load() { + return ErrSendQueueClosed + } + + select { + case s.sendQueue <- data: + return nil + case <-time.After(sendQueueTimeout): + return ErrSendQueueTimeout + } +} + +func (s *Session) processSendQueue() { + for { + select { + case <-s.sessionCloseCh: + return + case <-s.closeCh: + return + case data := <-s.sendQueue: + if len(data) > maxDataChannelMessageSize { + logger.Debugf("[salutejazz] Message too large: %d bytes (max %d)", len(data), maxDataChannelMessageSize) + continue + } + + encoded := EncodeDataPacket(data) + logger.Verbosef("[salutejazz] Sending %d bytes (encoded to %d bytes)", len(data), len(encoded)) + + if err := s.dc.Send(encoded); err != nil { + logger.Debugf("send error: %v", err) + s.queueReconnect() + return + } + time.Sleep(sendDelay) + } + } +} + +// Close terminates the connection. +func (s *Session) Close() error { + s.closed.Store(true) + s.sendQueueClosed.Store(true) + + close(s.closeCh) + + done := make(chan struct{}) + go func() { + s.wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(closeWaitTimeout): + } + + if s.dc != nil { + _ = s.dc.Close() + } + if s.pcPub != nil { + _ = s.pcPub.Close() + } + if s.pcSub != nil { + _ = s.pcSub.Close() + } + if s.ws != nil { + s.wsMu.Lock() + _ = s.ws.WriteControl(websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), + time.Now().Add(time.Second)) + _ = s.ws.Close() + s.wsMu.Unlock() + } + return nil +} + +// AddVideoTrack adds a video track to the publisher peer connection. +func (s *Session) AddVideoTrack(track webrtc.TrackLocal) error { + s.videoTrackMu.Lock() + s.videoTracks = append(s.videoTracks, track) + s.videoTrackMu.Unlock() + + if s.pcPub == nil { + return nil + } + if _, err := s.pcPub.AddTrack(track); err != nil { + return fmt.Errorf("failed to add track: %w", err) + } + return nil +} + +// SetVideoTrackHandler registers a callback for remote video tracks. +func (s *Session) SetVideoTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) { + s.videoTrackMu.Lock() + defer s.videoTrackMu.Unlock() + s.onVideoTrack = cb +} + +// SetReconnectCallback sets the callback for reconnection events. +func (s *Session) SetReconnectCallback(cb func(*webrtc.DataChannel)) { s.onReconnect = cb } + +// SetShouldReconnect sets the policy for reconnection. +func (s *Session) SetShouldReconnect(fn func() bool) { s.shouldReconnect = fn } + +// SetEndedCallback sets the callback for connection termination. +func (s *Session) SetEndedCallback(cb func(string)) { s.onEnded = cb } + +// WatchConnection monitors the connection lifecycle. +func (s *Session) WatchConnection(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-s.closeCh: + return + case <-s.reconnectCh: + } + } +} + +// CanSend checks if data can be sent. +func (s *Session) CanSend() bool { + if s.onData == nil { + if s.hasLocalVideoTracks() { + return !s.closed.Load() && s.subscriberReady.Load() && s.publisherReady.Load() + } + return !s.closed.Load() && s.subscriberReady.Load() + } + if s.dc == nil || s.dc.ReadyState() != webrtc.DataChannelStateOpen { + return false + } + return len(s.sendQueue) < 4000 +} + +// GetSendQueue returns the transmission queue. +func (s *Session) GetSendQueue() chan []byte { return s.sendQueue } + +// GetBufferedAmount returns the WebRTC buffered amount. +func (s *Session) GetBufferedAmount() uint64 { + if s.dc != nil { + return s.dc.BufferedAmount() + } + return 0 +} + +func (s *Session) queueReconnect() { + if s.closed.Load() || s.reconnecting.Load() { + return + } + if s.shouldReconnect != nil && !s.shouldReconnect() { + return + } + select { + case s.reconnectCh <- struct{}{}: + default: + } +} + +func init() { //nolint:gochecknoinits // engine registration is the canonical Go pattern for plugins + engine.Register("salutejazz", New) +} diff --git a/internal/provider/jazz/api_test.go b/internal/provider/jazz/api_test.go deleted file mode 100644 index 4bdf683..0000000 --- a/internal/provider/jazz/api_test.go +++ /dev/null @@ -1,142 +0,0 @@ -package jazz - -import ( - "context" - "encoding/json" - "errors" - "net/http" - "net/http/httptest" - "strings" - "testing" -) - -func withJazzAPIServer(t *testing.T, h http.Handler) { - t.Helper() - old := apiBase - srv := httptest.NewServer(h) - t.Cleanup(func() { - apiBase = old - srv.Close() - }) - apiBase = srv.URL -} - -//nolint:cyclop // table-driven test naturally has many branches -func TestCreateMeetingAndPreconnect(t *testing.T) { - withJazzAPIServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Header.Get(headerAuthType) != authTypeAnonymous { - t.Fatalf("missing auth header: %v", r.Header) - } - switch r.URL.Path { - case "/room/create-meeting": //nolint:goconst // test literal, repetition is intentional - if r.Method != http.MethodPost { - t.Fatalf("create method = %s", r.Method) - } - _ = json.NewEncoder(w).Encode(createResponse{RoomID: "room-1", Password: "pass"}) //nolint:gosec,lll // G117: test-only struct mirroring upstream API shape - case "/room/room-1/preconnect": - if r.Method != http.MethodPost { - t.Fatalf("preconnect method = %s", r.Method) - } - _ = json.NewEncoder(w).Encode(map[string]string{"connectorUrl": "wss://connector"}) //nolint:goconst,lll // test literal, repetition is intentional - default: - http.NotFound(w, r) - } - })) - - headers := map[string]string{ - headerAuthType: authTypeAnonymous, - "Content-Type": "application/json", - } - created, err := createMeeting(context.Background(), headers) - if err != nil { - t.Fatalf("createMeeting() error = %v", err) - } - if created.RoomID != "room-1" || created.Password != "pass" { - t.Fatalf("createMeeting() = %+v", created) - } - - connector, err := preconnect(context.Background(), "room-1", "pass", headers) - if err != nil { - t.Fatalf("preconnect() error = %v", err) - } - if connector != "wss://connector" { - t.Fatalf("preconnect() = %q", connector) - } -} - -//nolint:cyclop // table-driven test naturally has many branches -func TestCreateRoomAndJoinRoom(t *testing.T) { - withJazzAPIServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - switch r.URL.Path { - case "/room/create-meeting": - _ = json.NewEncoder(w).Encode(createResponse{RoomID: "new-room", Password: "new-pass"}) //nolint:goconst,gosec,lll // test literal; G117 is a false positive for test fixtures - case "/room/new-room/preconnect", "/room/existing/preconnect": - _ = json.NewEncoder(w).Encode(map[string]string{"connectorUrl": "wss://connector"}) - default: - http.NotFound(w, r) - } - })) - - room, err := createRoom(context.Background()) - if err != nil { - t.Fatalf("createRoom() error = %v", err) - } - if room.RoomID != "new-room" || room.Password != "new-pass" || room.ConnectorURL != "wss://connector" { - t.Fatalf("createRoom() = %+v", room) - } - - room, err = joinRoom(context.Background(), "existing", "secret") - if err != nil { - t.Fatalf("joinRoom() error = %v", err) - } - if room.RoomID != "existing" || room.Password != "secret" || room.ConnectorURL != "wss://connector" { - t.Fatalf("joinRoom() = %+v", room) - } -} - -func TestJazzAPIErrors(t *testing.T) { - withJazzAPIServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - switch { - case strings.Contains(r.URL.Path, "create-meeting"): - http.Error(w, "bad", http.StatusTeapot) - default: - http.Error(w, "bad", http.StatusInternalServerError) - } - })) - - if _, err := createMeeting(context.Background(), nil); !errors.Is(err, errCreateRoomFailed) { - t.Fatalf("createMeeting() error = %v, want %v", err, errCreateRoomFailed) - } - if _, err := preconnect(context.Background(), "room", "pass", nil); !errors.Is(err, errPreconnectFailed) { - t.Fatalf("preconnect() error = %v, want %v", err, errPreconnectFailed) - } -} - -func TestNewPeerUsesRoomAPI(t *testing.T) { - withJazzAPIServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - switch r.URL.Path { - case "/room/create-meeting": - _ = json.NewEncoder(w).Encode(createResponse{RoomID: "new-room", Password: "new-pass"}) //nolint:gosec,lll // G117: test-only struct mirroring upstream API shape - case "/room/new-room/preconnect", "/room/existing/preconnect": - _ = json.NewEncoder(w).Encode(map[string]string{"connectorUrl": "wss://connector"}) - default: - http.NotFound(w, r) - } - })) - - created, err := NewPeer(context.Background(), "any", "peer", nil) - if err != nil { - t.Fatalf("NewPeer(create) error = %v", err) - } - if created.roomInfo.RoomID != "new-room" { - t.Fatalf("created room = %+v", created.roomInfo) - } - - joined, err := NewPeer(context.Background(), "existing:secret", "peer", nil) - if err != nil { - t.Fatalf("NewPeer(join) error = %v", err) - } - if joined.roomInfo.RoomID != "existing" || joined.roomInfo.Password != "secret" { - t.Fatalf("joined room = %+v", joined.roomInfo) - } -} diff --git a/internal/provider/jazz/datapacket_test.go b/internal/provider/jazz/datapacket_test.go deleted file mode 100644 index 7f87a30..0000000 --- a/internal/provider/jazz/datapacket_test.go +++ /dev/null @@ -1,70 +0,0 @@ -package jazz - -import ( - "bytes" - "errors" - "io" - "testing" -) - -func TestDataPacketRoundTrip(t *testing.T) { - payload := []byte("hello jazz") - raw := EncodeDataPacket(payload) - - got, ok := DecodeDataPacket(raw) - if !ok { - t.Fatal("DecodeDataPacket() ok = false") - } - if !bytes.Equal(got, payload) { - t.Fatalf("DecodeDataPacket() = %q, want %q", got, payload) - } -} - -func TestDecodeDataPacketRejectsMalformedPackets(t *testing.T) { - tests := [][]byte{ - nil, - {0xff}, - encodeField(1, 0, encodeVarint(0)), - {byte(2<<3 | 2), 10, 1}, - {byte(3<<3 | 7), 0}, - } - - for _, raw := range tests { - if payload, ok := DecodeDataPacket(raw); ok { - t.Fatalf("DecodeDataPacket(%v) = (%q, true), want false", raw, payload) - } - } -} - -func TestParseFieldsSkipsSupportedNonTargetWireTypes(t *testing.T) { - data := encodeField(1, 0, encodeVarint(150)) - data = append(data, encodeField(3, 1, []byte("12345678"))...) - data = append(data, encodeField(4, 5, []byte("1234"))...) - data = append(data, encodeField(2, 2, []byte("target"))...) - - got, ok := parseFields(data, 2) - if !ok || string(got) != "target" { - t.Fatalf("parseFields() = (%q, %v), want target", got, ok) - } -} - -func TestByteReader(t *testing.T) { - r := &byteReader{data: []byte{1, 2, 3}} - b, err := r.ReadByte() - if err != nil || b != 1 { - t.Fatalf("ReadByte() = (%d, %v), want (1, nil)", b, err) - } - - buf := make([]byte, 4) - n, err := r.Read(buf) - if err != nil || n != 2 || !bytes.Equal(buf[:n], []byte{2, 3}) { - t.Fatalf("Read() = (%d, %v, %v), want two bytes", n, err, buf[:n]) - } - - if _, err := r.ReadByte(); !errors.Is(err, io.EOF) { - t.Fatalf("ReadByte() error = %v, want EOF", err) - } - if n, err := r.Read(buf); !errors.Is(err, io.EOF) || n != 0 { - t.Fatalf("Read() = (%d, %v), want (0, EOF)", n, err) - } -} diff --git a/internal/provider/jazz/peer.go b/internal/provider/jazz/peer.go deleted file mode 100644 index 63cce68..0000000 --- a/internal/provider/jazz/peer.go +++ /dev/null @@ -1,785 +0,0 @@ -// Package jazz implements the SaluteJazz WebRTC provider. -package jazz - -import ( - "context" - "errors" - "fmt" - "log" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/google/uuid" - "github.com/gorilla/websocket" - "github.com/openlibrecommunity/olcrtc/internal/logger" - "github.com/openlibrecommunity/olcrtc/internal/protect" - "github.com/openlibrecommunity/olcrtc/internal/provider" - "github.com/pion/webrtc/v4" -) - -const ( - maxDataChannelMessageSize = 12288 - sendDelay = 2 * time.Millisecond - - keyRoomID = "roomId" - keyEvent = "event" - keyRequestID = "requestId" - keyPayload = "payload" -) - -var ( - // ErrPublisherNotInitialized is returned when the publisher peer connection is not set up. - ErrPublisherNotInitialized = errors.New("publisher peer connection not initialized") - // ErrSubscriberMediaTimeout is returned when the subscriber media is not ready within the timeout period. - ErrSubscriberMediaTimeout = errors.New("subscriber media timeout") -) - -// Peer represents a SaluteJazz WebRTC connection. -type Peer struct { - name string - roomInfo *RoomInfo - ws *websocket.Conn - wsMu sync.Mutex - pcSub *webrtc.PeerConnection - pcPub *webrtc.PeerConnection - dc *webrtc.DataChannel - onData func([]byte) - onReconnect func(*webrtc.DataChannel) - shouldReconnect func() bool - reconnectCh chan struct{} - closeCh chan struct{} - closed atomic.Bool - reconnecting atomic.Bool - sendQueue chan []byte - sendQueueClosed atomic.Bool - onEnded func(string) - sessionCloseCh chan struct{} - videoTrackMu sync.RWMutex - videoTracks []webrtc.TrackLocal - onVideoTrack func(*webrtc.TrackRemote, *webrtc.RTPReceiver) - subscriberReady atomic.Bool - publisherReady atomic.Bool - subscriberConn chan struct{} - publisherConn chan struct{} - wg sync.WaitGroup - groupID string -} - -// NewPeer creates a new Jazz provider peer. -func NewPeer(ctx context.Context, roomID, name string, onData func([]byte)) (*Peer, error) { - var roomInfo *RoomInfo - var err error - - if roomID == "" || roomID == "any" || roomID == "dummy" { - roomInfo, err = createRoom(ctx) - if err != nil { - return nil, fmt.Errorf("create room: %w", err) - } - log.Printf("Jazz room created: %s:%s", roomInfo.RoomID, roomInfo.Password) - log.Printf("To connect client use: -id \"%s:%s\"", roomInfo.RoomID, roomInfo.Password) - } else { - var password string - parts := strings.Split(roomID, ":") - if len(parts) == 2 { - roomID = parts[0] - password = parts[1] - } - - roomInfo, err = joinRoom(ctx, roomID, password) - if err != nil { - return nil, fmt.Errorf("join room: %w", err) - } - log.Printf("Jazz joining room: %s", roomInfo.RoomID) - } - - return &Peer{ - name: name, - roomInfo: roomInfo, - onData: onData, - reconnectCh: make(chan struct{}, 1), - closeCh: make(chan struct{}), - sessionCloseCh: make(chan struct{}), - sendQueue: make(chan []byte, 5000), - subscriberConn: make(chan struct{}), - publisherConn: make(chan struct{}), - }, nil -} - -func (p *Peer) resetMediaState() { - p.subscriberReady.Store(false) - p.publisherReady.Store(false) - p.subscriberConn = make(chan struct{}) - p.publisherConn = make(chan struct{}) -} - -func closeSignal(ch chan struct{}) { - select { - case <-ch: - default: - close(ch) - } -} - -func (p *Peer) hasLocalVideoTracks() bool { - p.videoTrackMu.RLock() - defer p.videoTrackMu.RUnlock() - return len(p.videoTracks) > 0 -} - -func (p *Peer) videoTrackHandler() func(*webrtc.TrackRemote, *webrtc.RTPReceiver) { - p.videoTrackMu.RLock() - defer p.videoTrackMu.RUnlock() - return p.onVideoTrack -} - -func (p *Peer) attachPendingVideoTracks() error { - p.videoTrackMu.RLock() - defer p.videoTrackMu.RUnlock() - - for _, track := range p.videoTracks { - if _, err := p.pcPub.AddTrack(track); err != nil { - return fmt.Errorf("failed to add track: %w", err) - } - } - - return nil -} - -func defaultWebRTCConfig() webrtc.Configuration { - return webrtc.Configuration{ - ICEServers: []webrtc.ICEServer{}, - SDPSemantics: webrtc.SDPSemanticsUnifiedPlan, - BundlePolicy: webrtc.BundlePolicyMaxBundle, - } -} - -func (p *Peer) buildAPI() *webrtc.API { - se := webrtc.SettingEngine{} - if protect.Protector != nil { - se.SetICEProxyDialer(protect.NewProxyDialer()) - } - return webrtc.NewAPI(webrtc.WithSettingEngine(se)) -} - -func (p *Peer) createPeerConnections(api *webrtc.API, config webrtc.Configuration) error { - var err error - p.pcSub, err = api.NewPeerConnection(config) - if err != nil { - return fmt.Errorf("create subscriber pc: %w", err) - } - p.pcSub.OnConnectionStateChange(p.onSubscriberConnectionStateChange) - p.pcSub.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { - if track.Kind() != webrtc.RTPCodecTypeVideo { - return - } - if cb := p.videoTrackHandler(); cb != nil { - cb(track, receiver) - } - }) - - p.pcPub, err = api.NewPeerConnection(config) - if err != nil { - return fmt.Errorf("create publisher pc: %w", err) - } - p.pcPub.OnConnectionStateChange(p.onPublisherConnectionStateChange) - return nil -} - -func (p *Peer) createDataChannel() (chan struct{}, error) { - var err error - p.dc, err = p.pcPub.CreateDataChannel("_reliable", &webrtc.DataChannelInit{ - Ordered: func() *bool { v := true; return &v }(), - }) - if err != nil { - return nil, fmt.Errorf("create datachannel: %w", err) - } - dcReady := make(chan struct{}) - p.setupDataChannelHandlers(dcReady) - return dcReady, nil -} - -func (p *Peer) waitForReady(ctx context.Context, dcReady chan struct{}) error { - if dcReady != nil { - select { - case <-dcReady: - return nil - case <-time.After(30 * time.Second): - return provider.ErrDataChannelTimeout - case <-ctx.Done(): - return fmt.Errorf("connect canceled: %w", ctx.Err()) - } - } - return p.waitForMediaReady(ctx, 30*time.Second) -} - -// Connect starts the WebRTC connection process. -func (p *Peer) Connect(ctx context.Context) error { - p.closed.Store(false) - p.resetMediaState() - - api := p.buildAPI() - config := defaultWebRTCConfig() - - if err := p.createPeerConnections(api, config); err != nil { - return err - } - if err := p.attachPendingVideoTracks(); err != nil { - return err - } - - var dcReady chan struct{} - if p.onData != nil { - var err error - dcReady, err = p.createDataChannel() - if err != nil { - return err - } - } - - if err := p.dialWebSocket(); err != nil { - return err - } - if err := p.sendJoin(); err != nil { - return err - } - - p.wg.Add(1) - go func() { - defer p.wg.Done() - p.handleSignaling(ctx) - }() - - return p.waitForReady(ctx, dcReady) -} - -func (p *Peer) waitForMediaReady(ctx context.Context, timeout time.Duration) error { - timer := time.NewTimer(timeout) - defer timer.Stop() - - select { - case <-p.subscriberConn: - case <-timer.C: - return ErrSubscriberMediaTimeout - case <-ctx.Done(): - return fmt.Errorf("connect cancelled: %w", ctx.Err()) - } - - return nil -} - -func (p *Peer) dialWebSocket() error { - wsDialer := websocket.Dialer{ - NetDialContext: protect.DialContext, - HandshakeTimeout: 15 * time.Second, - } - - ws, resp, err := wsDialer.Dial(p.roomInfo.ConnectorURL, nil) - if err != nil { - return fmt.Errorf("dial websocket: %w", err) - } - if resp != nil && resp.Body != nil { - _ = resp.Body.Close() - } - - p.ws = ws - ws.SetPongHandler(func(string) error { - _ = ws.SetReadDeadline(time.Now().Add(60 * time.Second)) - return nil - }) - _ = ws.SetReadDeadline(time.Now().Add(60 * time.Second)) - - return nil -} - -func (p *Peer) sendJoin() error { - joinMsg := map[string]any{ - keyRoomID: p.roomInfo.RoomID, - keyEvent: "join", - keyRequestID: uuid.New().String(), - keyPayload: map[string]any{ - "password": p.roomInfo.Password, - "participantName": p.name, - "supportedFeatures": map[string]any{ - "attachedRooms": true, - "sessionGroups": true, - "transcription": true, - }, - "isSilent": false, - }, - } - - p.wsMu.Lock() - defer p.wsMu.Unlock() - if err := p.ws.WriteJSON(joinMsg); err != nil { - return fmt.Errorf("write join json: %w", err) - } - return nil -} - -func (p *Peer) setupDataChannelHandlers(dcReady chan struct{}) { - p.dc.OnOpen(func() { - logger.Verbosef("[Jazz] Publisher DC opened: %s", p.dc.Label()) - p.wg.Add(1) - go func() { - defer p.wg.Done() - p.processSendQueue() - }() - close(dcReady) - }) - - p.dc.OnClose(func() { - logger.Verbosef("[Jazz] Publisher DC closed") - if !p.closed.Load() { - p.queueReconnect() - } - }) - - p.dc.OnMessage(func(msg webrtc.DataChannelMessage) { - p.handleIncomingMessage(msg.Data, "publisher") - }) - - p.pcSub.OnDataChannel(func(dc *webrtc.DataChannel) { - logger.Verbosef("[Jazz] Received subscriber DataChannel: %s", dc.Label()) - if dc.Label() != "_reliable" { - return - } - - if p.onData != nil { - dc.OnMessage(func(msg webrtc.DataChannelMessage) { - p.handleIncomingMessage(msg.Data, "subscriber") - }) - } - }) -} - -func (p *Peer) onSubscriberConnectionStateChange(state webrtc.PeerConnectionState) { - switch state { - case webrtc.PeerConnectionStateConnected: - p.subscriberReady.Store(true) - closeSignal(p.subscriberConn) - case webrtc.PeerConnectionStateDisconnected, webrtc.PeerConnectionStateFailed: - p.subscriberReady.Store(false) - if !p.closed.Load() { - p.queueReconnect() - } - case webrtc.PeerConnectionStateClosed: - p.subscriberReady.Store(false) - case webrtc.PeerConnectionStateUnknown, - webrtc.PeerConnectionStateNew, - webrtc.PeerConnectionStateConnecting: - } -} - -func (p *Peer) onPublisherConnectionStateChange(state webrtc.PeerConnectionState) { - switch state { - case webrtc.PeerConnectionStateConnected: - p.publisherReady.Store(true) - closeSignal(p.publisherConn) - case webrtc.PeerConnectionStateDisconnected, webrtc.PeerConnectionStateFailed: - p.publisherReady.Store(false) - if !p.closed.Load() { - p.queueReconnect() - } - case webrtc.PeerConnectionStateClosed: - p.publisherReady.Store(false) - case webrtc.PeerConnectionStateUnknown, - webrtc.PeerConnectionStateNew, - webrtc.PeerConnectionStateConnecting: - } -} - -func (p *Peer) handleIncomingMessage(data []byte, source string) { - logger.Verbosef("[Jazz] Received %d bytes on %s DC (raw)", len(data), source) - - payload, ok := DecodeDataPacket(data) - if !ok { - logger.Debugf("[Jazz] Failed to decode DataPacket, trying raw") - if p.onData != nil && len(data) > 0 { - p.onData(data) - } - return - } - - logger.Verbosef("[Jazz] Decoded DataPacket: %d bytes payload", len(payload)) - if p.onData != nil && len(payload) > 0 { - p.onData(payload) - } -} - -func (p *Peer) handleSignaling(_ context.Context) { - for { - var msg map[string]any - if err := p.ws.ReadJSON(&msg); err != nil { - if !p.closed.Load() { - logger.Debugf("ws read error: %v", err) - p.queueReconnect() - } - return - } - - p.updateWSDeadline() - - event, _ := msg[keyEvent].(string) - payload, _ := msg[keyPayload].(map[string]any) - - switch event { - case "join-response": - p.handleJoinResponse(payload) - case "media-out": - p.handleMediaOut(payload) - } - } -} - -func (p *Peer) handleJoinResponse(payload map[string]any) { - group, _ := payload["participantGroup"].(map[string]any) - p.groupID, _ = group["groupId"].(string) - logger.Verbosef("Jazz peer joined: groupId=%s", p.groupID) -} - -func (p *Peer) handleMediaOut(payload map[string]any) { - method, _ := payload["method"].(string) - - switch method { - case "rtc:config": - p.handleRTCConfig(payload) - case "rtc:join": - logger.Verbosef("Jazz rtc:join received") - case "rtc:offer": - p.handleSubscriberOffer(payload) - case "rtc:answer": - p.handlePublisherAnswer(payload) - case "rtc:ice": - p.handleICE(payload) - } -} - -func (p *Peer) handleRTCConfig(payload map[string]any) { - config, _ := payload["configuration"].(map[string]any) - servers, _ := config["iceServers"].([]any) - - var iceServers []webrtc.ICEServer - for _, s := range servers { - server, _ := s.(map[string]any) - urls, _ := server["urls"].([]any) - username, _ := server["username"].(string) - credential, _ := server["credential"].(string) - - var urlStrs []string - for _, u := range urls { - if urlStr, ok := u.(string); ok && urlStr != "" { - urlStrs = append(urlStrs, urlStr) - } - } - - if len(urlStrs) > 0 { - iceServers = append(iceServers, webrtc.ICEServer{ - URLs: urlStrs, - Username: username, - Credential: credential, - }) - } - } - - if len(iceServers) > 0 { - newConfig := webrtc.Configuration{ - ICEServers: iceServers, - SDPSemantics: webrtc.SDPSemanticsUnifiedPlan, - BundlePolicy: webrtc.BundlePolicyMaxBundle, - } - _ = p.pcSub.SetConfiguration(newConfig) - _ = p.pcPub.SetConfiguration(newConfig) - } -} - -func (p *Peer) handleSubscriberOffer(payload map[string]any) { - desc, _ := payload["description"].(map[string]any) - sdp, _ := desc["sdp"].(string) - - if err := p.pcSub.SetRemoteDescription(webrtc.SessionDescription{ - Type: webrtc.SDPTypeOffer, - SDP: sdp, - }); err != nil { - logger.Debugf("set remote desc error: %v", err) - return - } - - answer, err := p.pcSub.CreateAnswer(nil) - if err != nil { - logger.Debugf("create answer error: %v", err) - return - } - - if err := p.pcSub.SetLocalDescription(answer); err != nil { - logger.Debugf("set local desc error: %v", err) - return - } - - p.wsMu.Lock() - _ = p.ws.WriteJSON(map[string]any{ - keyRoomID: p.roomInfo.RoomID, - keyEvent: "media-in", - "groupId": p.groupID, - keyRequestID: uuid.New().String(), - keyPayload: map[string]any{ - "method": "rtc:answer", - "description": map[string]any{ - "type": "answer", - "sdp": answer.SDP, - }, - }, - }) - p.wsMu.Unlock() - - time.Sleep(300 * time.Millisecond) - p.sendPublisherOffer() -} - -func (p *Peer) sendPublisherOffer() { - offer, err := p.pcPub.CreateOffer(nil) - if err != nil { - logger.Debugf("create pub offer error: %v", err) - return - } - - if err := p.pcPub.SetLocalDescription(offer); err != nil { - logger.Debugf("set local pub desc error: %v", err) - return - } - - p.wsMu.Lock() - _ = p.ws.WriteJSON(map[string]any{ - keyRoomID: p.roomInfo.RoomID, - keyEvent: "media-in", - "groupId": p.groupID, - keyRequestID: uuid.New().String(), - keyPayload: map[string]any{ - "method": "rtc:offer", - "description": map[string]any{ - "type": "offer", - "sdp": offer.SDP, - }, - }, - }) - p.wsMu.Unlock() -} - -func (p *Peer) handlePublisherAnswer(payload map[string]any) { - desc, _ := payload["description"].(map[string]any) - sdp, _ := desc["sdp"].(string) - - if err := p.pcPub.SetRemoteDescription(webrtc.SessionDescription{ - Type: webrtc.SDPTypeAnswer, - SDP: sdp, - }); err != nil { - logger.Debugf("set remote pub desc error: %v", err) - } -} - -func (p *Peer) handleICE(payload map[string]any) { - candidates, _ := payload["rtcIceCandidates"].([]any) - - for _, c := range candidates { - cand, _ := c.(map[string]any) - candStr, _ := cand["candidate"].(string) - target, _ := cand["target"].(string) - sdpMid, _ := cand["sdpMid"].(string) - sdpMLineIndex, _ := cand["sdpMLineIndex"].(float64) - - init := webrtc.ICECandidateInit{ - Candidate: candStr, - SDPMid: &sdpMid, - SDPMLineIndex: func() *uint16 { v := uint16(sdpMLineIndex); return &v }(), - } - - switch target { - case "SUBSCRIBER": - _ = p.pcSub.AddICECandidate(init) - case "PUBLISHER": - _ = p.pcPub.AddICECandidate(init) - } - } -} - -func (p *Peer) updateWSDeadline() { - p.wsMu.Lock() - if p.ws != nil { - _ = p.ws.SetReadDeadline(time.Now().Add(60 * time.Second)) - } - p.wsMu.Unlock() -} - -// Send queues data for transmission. -func (p *Peer) Send(data []byte) error { - if p.dc == nil || p.dc.ReadyState() != webrtc.DataChannelStateOpen { - return provider.ErrDataChannelNotReady - } - - if p.sendQueueClosed.Load() { - return provider.ErrSendQueueClosed - } - - select { - case p.sendQueue <- data: - return nil - case <-time.After(50 * time.Millisecond): - return provider.ErrSendQueueTimeout - } -} - -func (p *Peer) processSendQueue() { - for { - select { - case <-p.sessionCloseCh: - return - case <-p.closeCh: - return - case data := <-p.sendQueue: - if len(data) > maxDataChannelMessageSize { - logger.Debugf("[Jazz] Message too large: %d bytes (max %d)", len(data), maxDataChannelMessageSize) - continue - } - - encoded := EncodeDataPacket(data) - logger.Verbosef("[Jazz] Sending %d bytes (encoded to %d bytes)", len(data), len(encoded)) - - if err := p.dc.Send(encoded); err != nil { - logger.Debugf("send error: %v", err) - p.queueReconnect() - return - } - time.Sleep(sendDelay) - } - } -} - -// Close terminates the connection and releases resources. -func (p *Peer) Close() error { - p.closed.Store(true) - p.sendQueueClosed.Store(true) - - close(p.closeCh) - - done := make(chan struct{}) - go func() { - p.wg.Wait() - close(done) - }() - - select { - case <-done: - case <-time.After(2 * time.Second): - } - - if p.dc != nil { - _ = p.dc.Close() - } - if p.pcPub != nil { - _ = p.pcPub.Close() - } - if p.pcSub != nil { - _ = p.pcSub.Close() - } - if p.ws != nil { - p.wsMu.Lock() - _ = p.ws.WriteControl(websocket.CloseMessage, - websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), - time.Now().Add(time.Second)) - _ = p.ws.Close() - p.wsMu.Unlock() - } - - return nil -} - -// AddVideoTrack adds a video track to the publisher peer connection. -func (p *Peer) AddVideoTrack(track webrtc.TrackLocal) error { - p.videoTrackMu.Lock() - p.videoTracks = append(p.videoTracks, track) - p.videoTrackMu.Unlock() - - if p.pcPub == nil { - return nil - } - if _, err := p.pcPub.AddTrack(track); err != nil { - return fmt.Errorf("failed to add track: %w", err) - } - return nil -} - -// SetVideoTrackHandler registers a callback for remote video tracks. -func (p *Peer) SetVideoTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) { - p.videoTrackMu.Lock() - defer p.videoTrackMu.Unlock() - p.onVideoTrack = cb -} - -// SetReconnectCallback sets the callback for reconnection events. -func (p *Peer) SetReconnectCallback(cb func(*webrtc.DataChannel)) { - p.onReconnect = cb -} - -// SetShouldReconnect sets the policy for reconnection. -func (p *Peer) SetShouldReconnect(fn func() bool) { - p.shouldReconnect = fn -} - -// SetEndedCallback sets the callback for connection termination. -func (p *Peer) SetEndedCallback(cb func(string)) { - p.onEnded = cb -} - -// WatchConnection monitors the connection lifecycle. -func (p *Peer) WatchConnection(ctx context.Context) { - for { - select { - case <-ctx.Done(): - return - case <-p.closeCh: - return - case <-p.reconnectCh: - } - } -} - -// CanSend checks if data can be sent. -func (p *Peer) CanSend() bool { - if p.onData == nil { - if p.hasLocalVideoTracks() { - return !p.closed.Load() && p.subscriberReady.Load() && p.publisherReady.Load() - } - return !p.closed.Load() && p.subscriberReady.Load() - } - if p.dc == nil || p.dc.ReadyState() != webrtc.DataChannelStateOpen { - return false - } - return len(p.sendQueue) < 4000 -} - -// GetSendQueue returns the transmission queue. -func (p *Peer) GetSendQueue() chan []byte { - return p.sendQueue -} - -// GetBufferedAmount returns the WebRTC buffered amount. -func (p *Peer) GetBufferedAmount() uint64 { - if p.dc != nil { - return p.dc.BufferedAmount() - } - return 0 -} - -func (p *Peer) queueReconnect() { - if p.closed.Load() || p.reconnecting.Load() { - return - } - if p.shouldReconnect != nil && !p.shouldReconnect() { - return - } - select { - case p.reconnectCh <- struct{}{}: - default: - } -} diff --git a/internal/provider/jazz/peer_helpers_test.go b/internal/provider/jazz/peer_helpers_test.go deleted file mode 100644 index ffa86e3..0000000 --- a/internal/provider/jazz/peer_helpers_test.go +++ /dev/null @@ -1,113 +0,0 @@ -package jazz - -import ( - "context" - "errors" - "testing" - - "github.com/openlibrecommunity/olcrtc/internal/provider" - "github.com/pion/webrtc/v4" -) - -//nolint:cyclop // table-driven test naturally has many branches -func TestPeerStateHelpers(t *testing.T) { - p := &Peer{ - reconnectCh: make(chan struct{}, 1), - closeCh: make(chan struct{}), - sessionCloseCh: make(chan struct{}), - sendQueue: make(chan []byte, 1), - subscriberConn: make(chan struct{}), - publisherConn: make(chan struct{}), - } - - p.resetMediaState() - if p.subscriberReady.Load() || p.publisherReady.Load() || p.subscriberConn == nil || p.publisherConn == nil { - t.Fatal("resetMediaState() did not reset readiness") - } - if p.hasLocalVideoTracks() { - t.Fatal("hasLocalVideoTracks() = true without tracks") - } - if err := p.AddVideoTrack(nil); err != nil { - t.Fatalf("AddVideoTrack(nil) error = %v", err) - } - if !p.hasLocalVideoTracks() { - t.Fatal("hasLocalVideoTracks() = false after AddVideoTrack") - } - - p.SetVideoTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver) {}) - if p.videoTrackHandler() == nil { - t.Fatal("videoTrackHandler() = nil") - } - - cfg := defaultWebRTCConfig() - if cfg.SDPSemantics != webrtc.SDPSemanticsUnifiedPlan || cfg.BundlePolicy != webrtc.BundlePolicyMaxBundle { - t.Fatalf("defaultWebRTCConfig() = %+v", cfg) - } - if p.buildAPI() == nil { - t.Fatal("buildAPI() returned nil") - } -} - -func TestPeerCallbacksQueueReconnectAndClose(t *testing.T) { - p := &Peer{ - reconnectCh: make(chan struct{}, 1), - closeCh: make(chan struct{}), - sessionCloseCh: make(chan struct{}), - sendQueue: make(chan []byte, 1), - } - - p.SetReconnectCallback(func(*webrtc.DataChannel) {}) - p.SetShouldReconnect(func() bool { return true }) - p.SetEndedCallback(func(string) {}) - if p.onReconnect == nil || p.shouldReconnect == nil || p.onEnded == nil { - t.Fatal("callbacks were not stored") - } - - p.queueReconnect() - select { - case <-p.reconnectCh: - default: - t.Fatal("queueReconnect() did not enqueue") - } - - p.SetShouldReconnect(func() bool { return false }) - p.queueReconnect() - select { - case <-p.reconnectCh: - t.Fatal("queueReconnect() enqueued despite policy=false") - default: - } - - done := make(chan struct{}) - go func() { - p.WatchConnection(context.Background()) - close(done) - }() - if err := p.Close(); err != nil { - t.Fatalf("Close() error = %v", err) - } - <-done - if err := p.Send([]byte("closed")); !errors.Is(err, provider.ErrDataChannelNotReady) { - t.Fatalf("Send() error = %v, want datachannel not ready", err) - } -} - -func TestPeerCanSendVideoOnlyModes(t *testing.T) { - p := &Peer{sendQueue: make(chan []byte, 1)} - p.subscriberReady.Store(true) - if !p.CanSend() { - t.Fatal("CanSend() = false for subscriber-ready peer without local video") - } - _ = p.AddVideoTrack(nil) - if p.CanSend() { - t.Fatal("CanSend() = true with local video but publisher not ready") - } - p.publisherReady.Store(true) - if !p.CanSend() { - t.Fatal("CanSend() = false with subscriber and publisher ready") - } - p.closed.Store(true) - if p.CanSend() { - t.Fatal("CanSend() = true for closed peer") - } -} diff --git a/internal/provider/jazz/provider.go b/internal/provider/jazz/provider.go deleted file mode 100644 index e5c8f8c..0000000 --- a/internal/provider/jazz/provider.go +++ /dev/null @@ -1,84 +0,0 @@ -// Package jazz implements the SaluteJazz WebRTC provider. -package jazz - -import ( - "context" - "fmt" - - "github.com/openlibrecommunity/olcrtc/internal/provider" - "github.com/pion/webrtc/v4" -) - -type jazzProvider struct { - peer *Peer -} - -// New creates a new SaluteJazz provider instance. -func New(ctx context.Context, cfg provider.Config) (provider.Provider, error) { - peer, err := NewPeer(ctx, cfg.RoomURL, cfg.Name, cfg.OnData) - if err != nil { - return nil, fmt.Errorf("create jazz peer: %w", err) - } - - return &jazzProvider{peer: peer}, nil -} - -// Connect starts the provider connection. -func (j *jazzProvider) Connect(ctx context.Context) error { - return j.peer.Connect(ctx) -} - -// Send transmits data to the room. -func (j *jazzProvider) Send(data []byte) error { - return j.peer.Send(data) -} - -// Close terminates the provider connection. -func (j *jazzProvider) Close() error { - return j.peer.Close() -} - -// SetReconnectCallback sets the function to call on reconnection. -func (j *jazzProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) { - j.peer.SetReconnectCallback(cb) -} - -// SetShouldReconnect sets the function to determine if reconnection should occur. -func (j *jazzProvider) SetShouldReconnect(fn func() bool) { - j.peer.SetShouldReconnect(fn) -} - -// SetEndedCallback sets the function to call when the session ends. -func (j *jazzProvider) SetEndedCallback(cb func(string)) { - j.peer.SetEndedCallback(cb) -} - -// WatchConnection monitors the provider connection state. -func (j *jazzProvider) WatchConnection(ctx context.Context) { - j.peer.WatchConnection(ctx) -} - -// CanSend checks if the provider is ready to transmit data. -func (j *jazzProvider) CanSend() bool { - return j.peer.CanSend() -} - -// GetSendQueue returns the data transmission queue. -func (j *jazzProvider) GetSendQueue() chan []byte { - return j.peer.GetSendQueue() -} - -// GetBufferedAmount returns the current WebRTC buffered amount. -func (j *jazzProvider) GetBufferedAmount() uint64 { - return j.peer.GetBufferedAmount() -} - -// AddVideoTrack adds a video track to the jazz connection. -func (j *jazzProvider) AddVideoTrack(track webrtc.TrackLocal) error { - return j.peer.AddVideoTrack(track) -} - -// SetVideoTrackHandler registers a callback for subscribed remote video tracks. -func (j *jazzProvider) SetVideoTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) { - j.peer.SetVideoTrackHandler(cb) -} diff --git a/internal/provider/jazz/provider_test.go b/internal/provider/jazz/provider_test.go deleted file mode 100644 index ab6741c..0000000 --- a/internal/provider/jazz/provider_test.go +++ /dev/null @@ -1,51 +0,0 @@ -package jazz - -import ( - "context" - "errors" - "testing" - - "github.com/openlibrecommunity/olcrtc/internal/provider" - "github.com/pion/webrtc/v4" -) - -func TestJazzProviderForwardsPeerMethods(t *testing.T) { - peer := &Peer{ - reconnectCh: make(chan struct{}, 1), - closeCh: make(chan struct{}), - sessionCloseCh: make(chan struct{}), - sendQueue: make(chan []byte, 1), - } - p := &jazzProvider{peer: peer} - - p.SetReconnectCallback(func(*webrtc.DataChannel) {}) - p.SetShouldReconnect(func() bool { return true }) - p.SetEndedCallback(func(string) {}) - p.SetVideoTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver) {}) - if peer.onReconnect == nil || peer.shouldReconnect == nil || peer.onEnded == nil || peer.onVideoTrack == nil { - t.Fatal("callbacks were not forwarded") - } - - if p.GetSendQueue() != peer.sendQueue { - t.Fatal("GetSendQueue() did not forward") - } - if p.GetBufferedAmount() != 0 { - t.Fatal("GetBufferedAmount() != 0 with nil datachannel") - } - if err := p.AddVideoTrack(nil); err != nil { - t.Fatalf("AddVideoTrack(nil) error = %v", err) - } - if err := p.Send([]byte("x")); !errors.Is(err, provider.ErrDataChannelNotReady) { - t.Fatalf("Send() error = %v, want datachannel not ready", err) - } - - done := make(chan struct{}) - go func() { - p.WatchConnection(context.Background()) - close(done) - }() - if err := p.Close(); err != nil { - t.Fatalf("Close() error = %v", err) - } - <-done -}