From d65784ff8c8de7a9dc72bd0d9a9c626b7f4b60e7 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 11 May 2026 13:13:21 +0300 Subject: [PATCH] refactor: migrate telemost to engine/goolom + auth/telemost MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Decompose the monolithic internal/provider/telemost package into two orthogonal layers: engine/goolom (Yandex proprietary SFU wire protocol — WebSocket signaling, dual pub/sub PeerConnections, DataChannel, telemetry) and auth/telemost (HTTP connection-info fetch → engine.Credentials). Add engine.Config.Refresh callback so Goolom can obtain fresh peerID and credentials on every reconnect without a direct dependency on the auth package. engine_adapter wires the Refresh closure from authProvider.Issue. Delete internal/provider/ entirely (telemost was the last tenant) and remove the now-obsolete provider_adapter + its test from builtin. Co-Authored-By: Claude Opus 4.7 --- internal/{provider => auth}/telemost/api.go | 7 +- internal/auth/telemost/telemost.go | 43 + internal/carrier/builtin/engine_adapter.go | 12 +- internal/carrier/builtin/provider_adapter.go | 121 -- .../carrier/builtin/provider_adapter_test.go | 205 --- internal/carrier/builtin/register.go | 31 +- internal/engine/engine.go | 14 + internal/engine/goolom/lifecycle.go | 422 +++++ internal/engine/goolom/media.go | 318 ++++ internal/engine/goolom/session.go | 322 ++++ internal/engine/goolom/signaling.go | 303 ++++ internal/engine/goolom/state.go | 246 +++ internal/provider/provider.go | 50 - internal/provider/telemost/api_test.go | 83 - internal/provider/telemost/peer.go | 1514 ----------------- .../provider/telemost/peer_helpers_test.go | 196 --- internal/provider/telemost/provider.go | 84 - internal/provider/telemost/provider_test.go | 55 - .../provider/telemost/state_helpers_test.go | 85 - 19 files changed, 1687 insertions(+), 2424 deletions(-) rename internal/{provider => auth}/telemost/api.go (86%) create mode 100644 internal/auth/telemost/telemost.go delete mode 100644 internal/carrier/builtin/provider_adapter.go delete mode 100644 internal/carrier/builtin/provider_adapter_test.go create mode 100644 internal/engine/goolom/lifecycle.go create mode 100644 internal/engine/goolom/media.go create mode 100644 internal/engine/goolom/session.go create mode 100644 internal/engine/goolom/signaling.go create mode 100644 internal/engine/goolom/state.go delete mode 100644 internal/provider/provider.go delete mode 100644 internal/provider/telemost/api_test.go delete mode 100644 internal/provider/telemost/peer.go delete mode 100644 internal/provider/telemost/peer_helpers_test.go delete mode 100644 internal/provider/telemost/provider.go delete mode 100644 internal/provider/telemost/provider_test.go delete mode 100644 internal/provider/telemost/state_helpers_test.go diff --git a/internal/provider/telemost/api.go b/internal/auth/telemost/api.go similarity index 86% rename from internal/provider/telemost/api.go rename to internal/auth/telemost/api.go index 2afd298..cde00f0 100644 --- a/internal/provider/telemost/api.go +++ b/internal/auth/telemost/api.go @@ -1,3 +1,9 @@ +// Package telemost is the auth provider for the Yandex Telemost service. +// It fetches the connection metadata (media server URL, peer ID, room ID, +// signing credentials) the Goolom engine needs to join a conference. +// +// Telemost does not expose an API to create rooms — they originate in the +// Yandex UI — so this provider does not implement auth.RoomCreator. package telemost import ( @@ -71,6 +77,5 @@ func GetConnectionInfo(ctx context.Context, roomURL, displayName string) (*Conne if err := json.NewDecoder(resp.Body).Decode(&info); err != nil { return nil, fmt.Errorf("failed to decode response: %w", err) } - return &info, nil } diff --git a/internal/auth/telemost/telemost.go b/internal/auth/telemost/telemost.go new file mode 100644 index 0000000..6774db1 --- /dev/null +++ b/internal/auth/telemost/telemost.go @@ -0,0 +1,43 @@ +package telemost + +import ( + "context" + "fmt" + + "github.com/openlibrecommunity/olcrtc/internal/auth" +) + +// Provider produces Goolom credentials for the Yandex Telemost service. +type Provider struct{} + +// Engine reports which engine consumes credentials from this auth provider. +func (Provider) Engine() string { return "goolom" } + +// Issue fetches connection info for a Telemost room and returns engine credentials. +// +// cfg.RoomURL must be a Telemost conference URL (e.g. +// https://telemost.yandex.ru/j/). Room creation is not supported by the +// Telemost API; rooms originate in the Yandex UI. +func (Provider) Issue(ctx context.Context, cfg auth.Config) (auth.Credentials, error) { + if cfg.RoomURL == "" { + return auth.Credentials{}, auth.ErrRoomIDRequired + } + info, err := GetConnectionInfo(ctx, cfg.RoomURL, cfg.Name) + if err != nil { + return auth.Credentials{}, fmt.Errorf("get connection info: %w", err) + } + return auth.Credentials{ + URL: info.ClientConfig.MediaServerURL, + Token: info.PeerID, + Extra: map[string]string{ + "roomID": info.RoomID, + "credentials": info.Credentials, + "roomURL": cfg.RoomURL, + "telemetryReferer": cfg.RoomURL, + }, + }, nil +} + +func init() { //nolint:gochecknoinits // auth registration is the canonical Go pattern for plugins + auth.Register("telemost", Provider{}) +} diff --git a/internal/carrier/builtin/engine_adapter.go b/internal/carrier/builtin/engine_adapter.go index fe12a11..9827623 100644 --- a/internal/carrier/builtin/engine_adapter.go +++ b/internal/carrier/builtin/engine_adapter.go @@ -15,13 +15,14 @@ import ( // reports. func registerEngineAuth(carrierName string, authProvider auth.Provider) { carrier.Register(carrierName, func(ctx context.Context, cfg carrier.Config) (carrier.Session, error) { - creds, err := authProvider.Issue(ctx, auth.Config{ + authCfg := auth.Config{ RoomURL: cfg.RoomURL, Name: cfg.Name, DNSServer: cfg.DNSServer, ProxyAddr: cfg.ProxyAddr, ProxyPort: cfg.ProxyPort, - }) + } + creds, err := authProvider.Issue(ctx, authCfg) if err != nil { return nil, fmt.Errorf("auth issue: %w", err) } @@ -35,6 +36,13 @@ func registerEngineAuth(carrierName string, authProvider auth.Provider) { DNSServer: cfg.DNSServer, ProxyAddr: cfg.ProxyAddr, ProxyPort: cfg.ProxyPort, + Refresh: func(ctx context.Context) (engine.Credentials, error) { + fresh, err := authProvider.Issue(ctx, authCfg) + if err != nil { + return engine.Credentials{}, fmt.Errorf("auth refresh: %w", err) + } + return engine.Credentials{URL: fresh.URL, Token: fresh.Token, Extra: fresh.Extra}, nil + }, }) if err != nil { return nil, fmt.Errorf("engine new: %w", err) diff --git a/internal/carrier/builtin/provider_adapter.go b/internal/carrier/builtin/provider_adapter.go deleted file mode 100644 index ced340e..0000000 --- a/internal/carrier/builtin/provider_adapter.go +++ /dev/null @@ -1,121 +0,0 @@ -package builtin - -import ( - "context" - "fmt" - - "github.com/openlibrecommunity/olcrtc/internal/carrier" - "github.com/openlibrecommunity/olcrtc/internal/provider" - "github.com/pion/webrtc/v4" -) - -type providerSession struct { - provider provider.Provider -} - -func (s *providerSession) Capabilities() carrier.Capabilities { - caps := carrier.Capabilities{ByteStream: true} - _, caps.VideoTrack = s.provider.(videoTrackProvider) - return caps -} - -func (s *providerSession) OpenByteStream() (carrier.ByteStream, error) { - return &providerByteStream{provider: s.provider}, nil -} - -func (s *providerSession) OpenVideoTrack() (carrier.VideoTrack, error) { - vtp, ok := s.provider.(videoTrackProvider) - if !ok { - return nil, carrier.ErrVideoTrackUnsupported - } - return &providerVideoTrack{provider: vtp}, nil -} - -type videoTrackProvider interface { - provider.Provider - provider.VideoTrackCapable -} - -type providerByteStream struct { - provider provider.Provider -} - -func (p *providerByteStream) Connect(ctx context.Context) error { - if err := p.provider.Connect(ctx); err != nil { - return fmt.Errorf("connect: %w", err) - } - return nil -} - -func (p *providerByteStream) Send(data []byte) error { - if err := p.provider.Send(data); err != nil { - return fmt.Errorf("send: %w", err) - } - return nil -} - -func (p *providerByteStream) Close() error { - if err := p.provider.Close(); err != nil { - return fmt.Errorf("close: %w", err) - } - return nil -} - -func (p *providerByteStream) SetReconnectCallback(cb func()) { - p.provider.SetReconnectCallback(func(_ *webrtc.DataChannel) { - if cb != nil { - cb() - } - }) -} - -func (p *providerByteStream) SetShouldReconnect(fn func() bool) { p.provider.SetShouldReconnect(fn) } -func (p *providerByteStream) SetEndedCallback(cb func(string)) { p.provider.SetEndedCallback(cb) } -func (p *providerByteStream) WatchConnection(ctx context.Context) { - p.provider.WatchConnection(ctx) -} -func (p *providerByteStream) CanSend() bool { return p.provider.CanSend() } - -type providerVideoTrack struct { - provider videoTrackProvider -} - -func (v *providerVideoTrack) Connect(ctx context.Context) error { - if err := v.provider.Connect(ctx); err != nil { - return fmt.Errorf("connect: %w", err) - } - return nil -} - -func (v *providerVideoTrack) Close() error { - if err := v.provider.Close(); err != nil { - return fmt.Errorf("close: %w", err) - } - return nil -} - -func (v *providerVideoTrack) SetReconnectCallback(cb func()) { - v.provider.SetReconnectCallback(func(_ *webrtc.DataChannel) { - if cb != nil { - cb() - } - }) -} - -func (v *providerVideoTrack) SetShouldReconnect(fn func() bool) { v.provider.SetShouldReconnect(fn) } -func (v *providerVideoTrack) SetEndedCallback(cb func(string)) { v.provider.SetEndedCallback(cb) } -func (v *providerVideoTrack) WatchConnection(ctx context.Context) { - v.provider.WatchConnection(ctx) -} -func (v *providerVideoTrack) CanSend() bool { return v.provider.CanSend() } - -func (v *providerVideoTrack) AddTrack(track webrtc.TrackLocal) error { - if err := v.provider.AddVideoTrack(track); err != nil { - return fmt.Errorf("add track: %w", err) - } - return nil -} - -func (v *providerVideoTrack) SetTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) { - v.provider.SetVideoTrackHandler(cb) -} diff --git a/internal/carrier/builtin/provider_adapter_test.go b/internal/carrier/builtin/provider_adapter_test.go deleted file mode 100644 index 6ba35d9..0000000 --- a/internal/carrier/builtin/provider_adapter_test.go +++ /dev/null @@ -1,205 +0,0 @@ -package builtin - -import ( - "context" - "errors" - "testing" - - "github.com/openlibrecommunity/olcrtc/internal/carrier" - "github.com/pion/webrtc/v4" -) - -var ( - errConnectBoom = errors.New("connect boom") - errSendBoom = errors.New("send boom") - errCloseBoom = errors.New("close boom") - errTrackBoom = errors.New("track boom") -) - -type stubProvider struct { - connectErr error - sendErr error - closeErr error - canSend bool - reconnectCallback func(*webrtc.DataChannel) - shouldReconnect func() bool - endedCallback func(string) - watchCalled bool - addTrackErr error - trackHandlerCalled bool -} - -func (s *stubProvider) Connect(context.Context) error { return s.connectErr } -func (s *stubProvider) Send([]byte) error { return s.sendErr } -func (s *stubProvider) Close() error { return s.closeErr } -func (s *stubProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) { s.reconnectCallback = cb } -func (s *stubProvider) SetShouldReconnect(fn func() bool) { s.shouldReconnect = fn } -func (s *stubProvider) SetEndedCallback(cb func(string)) { s.endedCallback = cb } -func (s *stubProvider) WatchConnection(context.Context) { s.watchCalled = true } -func (s *stubProvider) CanSend() bool { return s.canSend } -func (s *stubProvider) GetSendQueue() chan []byte { return nil } -func (s *stubProvider) GetBufferedAmount() uint64 { return 0 } -func (s *stubProvider) AddVideoTrack(webrtc.TrackLocal) error { return s.addTrackErr } -func (s *stubProvider) SetVideoTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) { - s.trackHandlerCalled = true -} - -type plainProvider struct { - connectErr error - sendErr error - closeErr error - canSend bool - reconnectCallback func(*webrtc.DataChannel) - shouldReconnect func() bool - endedCallback func(string) - watchCalled bool -} - -func (p *plainProvider) Connect(context.Context) error { return p.connectErr } -func (p *plainProvider) Send([]byte) error { return p.sendErr } -func (p *plainProvider) Close() error { return p.closeErr } -func (p *plainProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) { p.reconnectCallback = cb } -func (p *plainProvider) SetShouldReconnect(fn func() bool) { p.shouldReconnect = fn } -func (p *plainProvider) SetEndedCallback(cb func(string)) { p.endedCallback = cb } -func (p *plainProvider) WatchConnection(context.Context) { p.watchCalled = true } -func (p *plainProvider) CanSend() bool { return p.canSend } -func (p *plainProvider) GetSendQueue() chan []byte { return nil } -func (p *plainProvider) GetBufferedAmount() uint64 { return 0 } - -func TestProviderSessionOpenVideoTrackUnsupported(t *testing.T) { - sess := &providerSession{provider: &plainProvider{}} - - caps := sess.Capabilities() - if !caps.ByteStream || caps.VideoTrack { - t.Fatalf("Capabilities() = %+v, want byte true and video false", caps) - } - - _, err := sess.OpenVideoTrack() - if !errors.Is(err, carrier.ErrVideoTrackUnsupported) { - t.Fatalf("OpenVideoTrack() error = %v, want %v", err, carrier.ErrVideoTrackUnsupported) - } -} - -func TestProviderByteStreamWrapsProviderAndCallbacks(t *testing.T) { - prov := &stubProvider{canSend: true} - stream := &providerByteStream{provider: prov} - - called := false - stream.SetReconnectCallback(func() { called = true }) - if prov.reconnectCallback == nil { - t.Fatal("SetReconnectCallback() did not install provider callback") - } - prov.reconnectCallback(nil) - if !called { - t.Fatal("reconnect callback was not adapted") - } - - reconnectAllowed := false - stream.SetShouldReconnect(func() bool { reconnectAllowed = true; return true }) - if prov.shouldReconnect == nil || !prov.shouldReconnect() || !reconnectAllowed { - t.Fatal("SetShouldReconnect() was not forwarded") - } - - ended := "" - stream.SetEndedCallback(func(reason string) { ended = reason }) - if prov.endedCallback == nil { - t.Fatal("SetEndedCallback() was not forwarded") - } - prov.endedCallback("bye") - if ended != "bye" { - t.Fatalf("ended callback reason = %q, want bye", ended) - } - - stream.WatchConnection(context.Background()) - if !prov.watchCalled { - t.Fatal("WatchConnection() was not forwarded") - } - if !stream.CanSend() { - t.Fatal("CanSend() = false, want true") - } -} - -func TestProviderByteStreamWrapsErrors(t *testing.T) { - prov := &stubProvider{ - connectErr: errConnectBoom, - sendErr: errSendBoom, - closeErr: errCloseBoom, - } - stream := &providerByteStream{provider: prov} - - if err := stream.Connect(context.Background()); err == nil || err.Error() != "connect: connect boom" { - t.Fatalf("Connect() error = %v", err) - } - if err := stream.Send([]byte("x")); err == nil || err.Error() != "send: send boom" { - t.Fatalf("Send() error = %v", err) - } - if err := stream.Close(); err == nil || err.Error() != "close: close boom" { - t.Fatalf("Close() error = %v", err) - } -} - -func TestProviderSessionOpenByteStreamAndVideoTrack(t *testing.T) { - prov := &stubProvider{canSend: true} - sess := &providerSession{provider: prov} - - stream, err := sess.OpenByteStream() - if err != nil { - t.Fatalf("OpenByteStream() error = %v", err) - } - if !stream.CanSend() { - t.Fatal("byte stream CanSend() = false, want true") - } - - video, err := sess.OpenVideoTrack() - if err != nil { - t.Fatalf("OpenVideoTrack() error = %v", err) - } - if err := video.Connect(context.Background()); err != nil { - t.Fatalf("video Connect() error = %v", err) - } - if err := video.Close(); err != nil { - t.Fatalf("video Close() error = %v", err) - } - video.SetShouldReconnect(func() bool { return true }) - video.SetEndedCallback(func(string) {}) - video.WatchConnection(context.Background()) - if !video.CanSend() || prov.shouldReconnect == nil || prov.endedCallback == nil || !prov.watchCalled { - t.Fatal("video adapter did not forward calls") - } -} - -func TestProviderVideoTrackWrapsOperations(t *testing.T) { - prov := &stubProvider{canSend: true, addTrackErr: errTrackBoom} - track := &providerVideoTrack{provider: prov} - - called := false - track.SetReconnectCallback(func() { called = true }) - prov.reconnectCallback(nil) - if !called { - t.Fatal("reconnect callback was not adapted") - } - - track.SetTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver) {}) - if !prov.trackHandlerCalled { - t.Fatal("SetTrackHandler() was not forwarded") - } - - if err := track.AddTrack(nil); err == nil || err.Error() != "add track: track boom" { - t.Fatalf("AddTrack() error = %v", err) - } -} - -func TestProviderVideoTrackWrapsConnectCloseErrors(t *testing.T) { - prov := &stubProvider{ - connectErr: errConnectBoom, - closeErr: errCloseBoom, - } - track := &providerVideoTrack{provider: prov} - - if err := track.Connect(context.Background()); err == nil || err.Error() != "connect: connect boom" { - t.Fatalf("Connect() error = %v", err) - } - if err := track.Close(); err == nil || err.Error() != "close: close boom" { - t.Fatalf("Close() error = %v", err) - } -} diff --git a/internal/carrier/builtin/register.go b/internal/carrier/builtin/register.go index 0f2f7a1..73a815c 100644 --- a/internal/carrier/builtin/register.go +++ b/internal/carrier/builtin/register.go @@ -2,42 +2,17 @@ package builtin import ( - "context" - authSaluteJazz "github.com/openlibrecommunity/olcrtc/internal/auth/salutejazz" + authTelemost "github.com/openlibrecommunity/olcrtc/internal/auth/telemost" authWBStream "github.com/openlibrecommunity/olcrtc/internal/auth/wbstream" - "github.com/openlibrecommunity/olcrtc/internal/carrier" + _ "github.com/openlibrecommunity/olcrtc/internal/engine/goolom" // engine registration via init _ "github.com/openlibrecommunity/olcrtc/internal/engine/livekit" // engine registration via init _ "github.com/openlibrecommunity/olcrtc/internal/engine/salutejazz" // engine registration via init - "github.com/openlibrecommunity/olcrtc/internal/provider" - "github.com/openlibrecommunity/olcrtc/internal/provider/telemost" ) -type providerFactory func(context.Context, provider.Config) (provider.Provider, error) - // Register wires the built-in carriers into the carrier registry. func Register() { - // Legacy provider-based carriers (still being migrated to engine+auth). - registerProvider("telemost", telemost.New) - - // Migrated to engine+auth. registerEngineAuth("wbstream", authWBStream.Provider{}) registerEngineAuth("jazz", authSaluteJazz.Provider{}) -} - -func registerProvider(name string, factory providerFactory) { - carrier.Register(name, func(ctx context.Context, cfg carrier.Config) (carrier.Session, error) { - prov, err := factory(ctx, provider.Config{ - RoomURL: cfg.RoomURL, - Name: cfg.Name, - OnData: cfg.OnData, - DNSServer: cfg.DNSServer, - ProxyAddr: cfg.ProxyAddr, - ProxyPort: cfg.ProxyPort, - }) - if err != nil { - return nil, err - } - return &providerSession{provider: prov}, nil - }) + registerEngineAuth("telemost", authTelemost.Provider{}) } diff --git a/internal/engine/engine.go b/internal/engine/engine.go index 3b037e0..c69e23a 100644 --- a/internal/engine/engine.go +++ b/internal/engine/engine.go @@ -30,10 +30,23 @@ type Capabilities struct { VideoTrack bool } +// Credentials are produced by an auth provider — duplicated here to avoid an +// import cycle between engine and auth. +type Credentials struct { + URL string + Token string + Extra map[string]string +} + // Config is the runtime input to an engine factory. URL/Token are produced by // an auth provider (or supplied directly by the caller for "none" auth). // Extra carries engine-specific fields that don't fit the common shape // (e.g. SaluteJazz needs a separate room password alongside the room ID). +// +// Refresh, when set, is called by an engine whose protocol requires fresh +// credentials on each reconnect (e.g. Goolom: every reconnect needs a new +// peerID/credentials tuple from the room-info HTTP endpoint). Engines that +// don't need this should ignore it. type Config struct { URL string Token string @@ -43,6 +56,7 @@ type Config struct { DNSServer string ProxyAddr string ProxyPort int + Refresh func(ctx context.Context) (Credentials, error) } // Session is the engine-level runtime handle. It is shaped to match what diff --git a/internal/engine/goolom/lifecycle.go b/internal/engine/goolom/lifecycle.go new file mode 100644 index 0000000..77fae7f --- /dev/null +++ b/internal/engine/goolom/lifecycle.go @@ -0,0 +1,422 @@ +package goolom + +import ( + "context" + "fmt" + "time" + + "github.com/google/uuid" + "github.com/gorilla/websocket" + "github.com/openlibrecommunity/olcrtc/internal/engine" + "github.com/openlibrecommunity/olcrtc/internal/logger" + "github.com/openlibrecommunity/olcrtc/internal/protect" + "github.com/pion/webrtc/v4" +) + +// Connect starts the WebRTC connection process. +func (s *Session) Connect(ctx context.Context) error { + s.closed.Store(false) + s.resetMediaState() + + config := webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.rtc.yandex.net:3478"}}}, + SDPSemantics: webrtc.SDPSemanticsUnifiedPlan, + } + + if err := s.setupPeerConnections(config); err != nil { + return err + } + + keepAliveCh, sessionCloseCh := s.resetSession() + var dcReady chan struct{} + if s.onData != nil { + var err error + s.dc, err = s.pcPub.CreateDataChannel("olcrtc", nil) + if err != nil { + return fmt.Errorf("create dc: %w", err) + } + dcReady = make(chan struct{}) + s.setupDataChannelHandlers(dcReady, sessionCloseCh) + } + + if err := s.dialWebSocket(); err != nil { + return err + } + + s.setupICEHandlers() + s.startBackgroundGoroutines(ctx, keepAliveCh) + + if s.onData != nil { + select { + case <-dcReady: + return nil + case <-time.After(15 * time.Second): + return ErrDataChannelTimeout + case <-ctx.Done(): + return fmt.Errorf("connect context cancelled: %w", ctx.Err()) + } + } + + return s.waitForMediaReady(ctx, 20*time.Second) +} + +func (s *Session) waitForMediaReady(ctx context.Context, timeout time.Duration) error { + timer := time.NewTimer(timeout) + defer timer.Stop() + + select { + case <-s.subscriberConn: + case <-timer.C: + return ErrSubscriberMediaTimeout + case <-ctx.Done(): + return fmt.Errorf("connect context cancelled: %w", ctx.Err()) + } + return nil +} + +func (s *Session) setupPeerConnections(config webrtc.Configuration) error { + settingEngine := webrtc.SettingEngine{} + if protect.Protector != nil { + settingEngine.SetICEProxyDialer(protect.NewProxyDialer()) + } + api := webrtc.NewAPI(webrtc.WithSettingEngine(settingEngine)) + + var err error + s.pcSub, err = api.NewPeerConnection(config) + if err != nil { + return fmt.Errorf("new sub pc: %w", err) + } + s.pcSub.OnConnectionStateChange(s.onSubscriberConnectionStateChange) + s.pcSub.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { + if track.Kind() != webrtc.RTPCodecTypeVideo { + return + } + logger.Infof("goolom remote video track: codec=%s stream=%s track=%s", + track.Codec().MimeType, track.StreamID(), track.ID()) + if cb := s.videoTrackHandler(); cb != nil { + cb(track, receiver) + } + }) + + s.pcPub, err = api.NewPeerConnection(config) + if err != nil { + return fmt.Errorf("new pub pc: %w", err) + } + s.pcPub.OnConnectionStateChange(s.onPublisherConnectionStateChange) + + if err := s.attachPendingVideoTracks(); err != nil { + return err + } + return nil +} + +func (s *Session) dialWebSocket() error { + wsDialer := websocket.Dialer{ + NetDialContext: protect.DialContext, + HandshakeTimeout: wsHandshakeTimeout, + } + ws, resp, err := wsDialer.Dial(s.mediaServerURL, nil) + if err != nil { + return fmt.Errorf("dial ws: %w", err) + } + if resp != nil && resp.Body != nil { + _ = resp.Body.Close() + } + s.ws = ws + + ws.SetPongHandler(func(string) error { + _ = ws.SetReadDeadline(time.Now().Add(wsReadTimeout)) + return nil + }) + _ = ws.SetReadDeadline(time.Now().Add(wsReadTimeout)) + return nil +} + +func (s *Session) startBackgroundGoroutines(ctx context.Context, keepAliveCh chan struct{}) { + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.keepAlive(keepAliveCh) + }() + + _ = s.sendHello() + + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.handleSignaling(ctx) + }() +} + +func (s *Session) onConnectionStateChange(state webrtc.PeerConnectionState) { + if !s.closed.Load() && state == webrtc.PeerConnectionStateFailed { + s.queueReconnect() + } +} + +func (s *Session) onSubscriberConnectionStateChange(state webrtc.PeerConnectionState) { + logger.Debugf("goolom subscriber state: %s", state.String()) + switch state { + case webrtc.PeerConnectionStateConnected: + s.subscriberReady.Store(true) + closeSignal(s.subscriberConn) + case webrtc.PeerConnectionStateDisconnected, + webrtc.PeerConnectionStateFailed, + webrtc.PeerConnectionStateClosed: + s.subscriberReady.Store(false) + case webrtc.PeerConnectionStateUnknown, + webrtc.PeerConnectionStateNew, + webrtc.PeerConnectionStateConnecting: + } + s.onConnectionStateChange(state) +} + +func (s *Session) onPublisherConnectionStateChange(state webrtc.PeerConnectionState) { + logger.Debugf("goolom publisher state: %s", state.String()) + switch state { + case webrtc.PeerConnectionStateConnected: + s.publisherReady.Store(true) + closeSignal(s.publisherConn) + case webrtc.PeerConnectionStateDisconnected, + webrtc.PeerConnectionStateFailed, + webrtc.PeerConnectionStateClosed: + s.publisherReady.Store(false) + case webrtc.PeerConnectionStateUnknown, + webrtc.PeerConnectionStateNew, + webrtc.PeerConnectionStateConnecting: + } + s.onConnectionStateChange(state) +} + +// Close terminates the session and releases resources. +func (s *Session) Close() error { + alreadyClosing := s.closed.Swap(true) + s.sendQueueClosed.Store(true) + + if !alreadyClosing { + leaveUID := uuid.New().String() + leaveAck := s.registerAckWaiter(leaveUID) + if s.sendLeave(leaveUID) { + _ = s.waitForAck(leaveUID, leaveAck, 1500*time.Millisecond) + } else { + s.removeAckWaiter(leaveUID) + } + } + + closeSignal(s.closeCh) + s.stopSession() + + if s.dc != nil { + _ = s.dc.Close() + } + if s.pcPub != nil { + _ = s.pcPub.Close() + } + if s.pcSub != nil { + _ = s.pcSub.Close() + } + if s.ws != nil { + s.wsMu.Lock() + _ = s.ws.WriteControl(websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), + time.Now().Add(time.Second)) + _ = s.ws.Close() + s.wsMu.Unlock() + } + + done := make(chan struct{}) + go func() { + s.wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + } + return nil +} + +// WatchConnection monitors the connection lifecycle and reconnects as needed. +func (s *Session) WatchConnection(ctx context.Context) { + const maxReconnects = 10 + const reconnectWindow = 5 * time.Minute + + for { + select { + case <-ctx.Done(): + return + case <-s.closeCh: + return + case <-s.reconnectCh: + if s.handleReconnectAttempt(ctx, maxReconnects, reconnectWindow) { + return + } + } + } +} + +func (s *Session) handleReconnectAttempt(ctx context.Context, maxReconnects int, reconnectWindow time.Duration) bool { + if time.Since(s.lastReconnect) > reconnectWindow { + s.reconnectCount = 0 + } + s.reconnectCount++ + s.lastReconnect = time.Now() + + if s.reconnectCount > maxReconnects { + s.signalEnded("reconnect limit reached") + return true + } + + backoff := time.Duration(s.reconnectCount) * 2 * time.Second + if backoff > 30*time.Second { + backoff = 30 * time.Second + } + return s.retryReconnect(ctx, backoff) +} + +func (s *Session) retryReconnect(ctx context.Context, backoff time.Duration) bool { + for { + if err := s.reconnect(ctx); err != nil { + logger.Debugf("reconnect failed: %v", err) + select { + case <-ctx.Done(): + return true + case <-s.closeCh: + return true + case <-time.After(backoff): + continue + } + } + break + } + return false +} + +func (s *Session) reconnect(ctx context.Context) error { + s.reconnecting.Store(true) + defer s.reconnecting.Store(false) + + s.sendLeave(uuid.New().String()) + time.Sleep(500 * time.Millisecond) + s.stopSession() + + if s.dc != nil { + _ = s.dc.Close() + } + if s.pcPub != nil { + _ = s.pcPub.Close() + } + if s.pcSub != nil { + _ = s.pcSub.Close() + } + if s.ws != nil { + s.wsMu.Lock() + _ = s.ws.WriteControl(websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), + time.Now().Add(time.Second)) + _ = s.ws.Close() + s.wsMu.Unlock() + } + + if s.onReconnect != nil { + s.onReconnect(nil) + } + + time.Sleep(3 * time.Second) + if s.refresh == nil { + return ErrNoRefresh + } + creds, err := s.refresh(ctx) + if err != nil { + return fmt.Errorf("reconnect refresh: %w", err) + } + s.applyRefreshedCredentials(creds) + + if err := s.Connect(ctx); err != nil { + return err + } + if s.onReconnect != nil { + s.onReconnect(s.dc) + } + s.drainReconnectQueue() + return nil +} + +func (s *Session) applyRefreshedCredentials(creds engine.Credentials) { + if creds.URL != "" { + s.mediaServerURL = creds.URL + } + if creds.Token != "" { + s.peerID = creds.Token + } + if creds.Extra == nil { + return + } + if v := creds.Extra[credentialKeyRoomID]; v != "" { + s.roomID = v + } + if v := creds.Extra[credentialKeyCredentials]; v != "" { + s.credentials = v + } + if v := creds.Extra[credentialKeyRoomURL]; v != "" { + s.roomURL = v + } + if v := creds.Extra[credentialKeyTelemetryReferer]; v != "" { + s.telemetryReferer = v + } +} + +func (s *Session) drainReconnectQueue() { + for { + select { + case <-s.reconnectCh: + default: + return + } + } +} + +func (s *Session) queueReconnect() { + if s.closed.Load() || s.reconnecting.Load() { + return + } + if s.shouldReconnect != nil && !s.shouldReconnect() { + return + } + select { + case s.reconnectCh <- struct{}{}: + default: + } +} + +func (s *Session) stopSession() { + s.stopTelemetry() + s.sessionMu.Lock() + closeSignal(s.keepAliveCh) + closeSignal(s.sessionCloseCh) + s.sessionMu.Unlock() +} + +func (s *Session) resetSession() (chan struct{}, chan struct{}) { + s.sessionMu.Lock() + defer s.sessionMu.Unlock() + s.keepAliveCh = make(chan struct{}) + s.sessionCloseCh = make(chan struct{}) + return s.keepAliveCh, s.sessionCloseCh +} + +func (s *Session) resetMediaState() { + s.subscriberReady.Store(false) + s.publisherReady.Store(false) + s.subscriberConn = make(chan struct{}) + s.publisherConn = make(chan struct{}) +} + +func (s *Session) signalEnded(reason string) { + s.closed.Store(true) + s.stopTelemetry() + if s.onEnded != nil { + s.onEnded(reason) + } +} diff --git a/internal/engine/goolom/media.go b/internal/engine/goolom/media.go new file mode 100644 index 0000000..ba2118f --- /dev/null +++ b/internal/engine/goolom/media.go @@ -0,0 +1,318 @@ +package goolom + +import ( + "fmt" + "strings" + "time" + + "github.com/google/uuid" + "github.com/openlibrecommunity/olcrtc/internal/logger" + "github.com/pion/webrtc/v4" +) + +func (s *Session) setupDataChannelHandlers(dcReady chan struct{}, sessionCloseCh chan struct{}) { + s.dc.OnOpen(func() { + numWorkers := 4 + for i := range numWorkers { + s.wg.Add(1) + go func(workerID int) { + defer s.wg.Done() + s.processSendQueue(workerID, sessionCloseCh) + }(i) + } + close(dcReady) + }) + + s.dc.OnClose(s.onDataChannelClose) + s.dc.OnMessage(s.onDataChannelMessage) + + s.pcSub.OnDataChannel(func(dc *webrtc.DataChannel) { + if s.onData != nil { + dc.OnMessage(s.onDataChannelMessage) + } + }) +} + +func (s *Session) onDataChannelClose() { + if !s.closed.Load() { + s.queueReconnect() + } +} + +func (s *Session) onDataChannelMessage(msg webrtc.DataChannelMessage) { + if s.onData != nil && len(msg.Data) > 0 { + s.onData(msg.Data) + } +} + +func (s *Session) handleSdpOffer(offer map[string]any, uid string, sendPub bool) error { + sdp, _ := offer["sdp"].(string) + pcSeq, _ := offer["pcSeq"].(float64) + + if err := s.pcSub.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeOffer, + SDP: sdp, + }); err != nil { + return fmt.Errorf("set remote desc: %w", err) + } + + answer, err := s.pcSub.CreateAnswer(nil) + if err != nil { + return fmt.Errorf("create answer: %w", err) + } + + if err := s.pcSub.SetLocalDescription(answer); err != nil { + return fmt.Errorf("set local desc: %w", err) + } + + s.wsMu.Lock() + _ = s.ws.WriteJSON(map[string]any{ + keyUID: uuid.New().String(), + "subscriberSdpAnswer": map[string]any{ + keyPcSeq: int(pcSeq), + "sdp": answer.SDP, + }, + }) + s.wsMu.Unlock() + + s.sendAck(uid) + + if s.onData == nil { + if err := s.sendSetSlots(); err != nil { + logger.Debugf("setSlots error: %v", err) + } + } + + if !sendPub { + return nil + } + + time.Sleep(300 * time.Millisecond) + + pubOffer, err := s.pcPub.CreateOffer(nil) + if err != nil { + return fmt.Errorf("create pub offer: %w", err) + } + if err := s.pcPub.SetLocalDescription(pubOffer); err != nil { + return fmt.Errorf("set local pub desc: %w", err) + } + + s.wsMu.Lock() + _ = s.ws.WriteJSON(map[string]any{ + keyUID: uuid.New().String(), + "publisherSdpOffer": map[string]any{ + keyPcSeq: 1, + "sdp": pubOffer.SDP, + "tracks": s.publisherTrackDescriptions(), + }, + }) + s.wsMu.Unlock() + return nil +} + +func (s *Session) handleSdpAnswer(answer map[string]any, uid string) { + sdp, _ := answer["sdp"].(string) + if err := s.pcPub.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, + SDP: sdp, + }); err != nil { + logger.Debugf("SetRemoteDescription error: %v", err) + } + s.sendAck(uid) +} + +func (s *Session) handleICE(cand map[string]any) { + candStr, _ := cand["candidate"].(string) + target, _ := cand["target"].(string) + sdpMid, _ := cand["sdpMid"].(string) + sdpMLineIndex, _ := cand["sdpMlineIndex"].(float64) + + parts := strings.Fields(candStr) + if len(parts) < 8 { + return + } + + init := webrtc.ICECandidateInit{ + Candidate: candStr, + SDPMid: &sdpMid, + SDPMLineIndex: func() *uint16 { v := uint16(sdpMLineIndex); return &v }(), + } + switch target { + case "SUBSCRIBER": + _ = s.pcSub.AddICECandidate(init) + case "PUBLISHER": + _ = s.pcPub.AddICECandidate(init) + } +} + +func (s *Session) setupICEHandlers() { + s.pcSub.OnICECandidate(func(c *webrtc.ICECandidate) { + if c == nil { + return + } + init := c.ToJSON() + s.wsMu.Lock() + _ = s.ws.WriteJSON(map[string]any{ + keyUID: uuid.New().String(), + "webrtcIceCandidate": map[string]any{ + "candidate": init.Candidate, + "sdpMid": init.SDPMid, + "sdpMlineIndex": init.SDPMLineIndex, + "target": "SUBSCRIBER", + keyPcSeq: 1, + }, + }) + s.wsMu.Unlock() + }) + + s.pcPub.OnICECandidate(func(c *webrtc.ICECandidate) { + if c == nil { + return + } + init := c.ToJSON() + s.wsMu.Lock() + _ = s.ws.WriteJSON(map[string]any{ + keyUID: uuid.New().String(), + "webrtcIceCandidate": map[string]any{ + "candidate": init.Candidate, + "sdpMid": init.SDPMid, + "sdpMlineIndex": init.SDPMLineIndex, + "target": "PUBLISHER", + keyPcSeq: 1, + }, + }) + s.wsMu.Unlock() + }) +} + +func (s *Session) sendSetSlots() error { + s.wsMu.Lock() + defer s.wsMu.Unlock() + + // Goolom only forwards as many remote videos as the subscriber asks for via + // setSlots. Request a generous count so each subscriber sees every active + // publisher in the room. + slots := make([]map[string]int, 0, 8) + for range 8 { + slots = append(slots, map[string]int{"width": 1280, "height": 720}) + } + if err := s.ws.WriteJSON(map[string]any{ + keyUID: uuid.New().String(), + "setSlots": map[string]any{ + "slots": slots, + "audioSlotsCount": 0, + "key": 1, + "shutdownAllVideo": nil, + "withSelfView": false, + "selfViewVisibility": "ON_LOADING_THEN_SHOW", + "gridConfig": map[string]any{}, + }, + }); err != nil { + return fmt.Errorf("write set slots: %w", err) + } + return nil +} + +func (s *Session) publisherTrackDescriptions() []map[string]any { + if s.pcPub == nil { + return nil + } + tracks := make([]map[string]any, 0) + for _, transceiver := range s.pcPub.GetTransceivers() { + sender := transceiver.Sender() + if sender == nil { + continue + } + track := sender.Track() + if track == nil { + continue + } + kind := "VIDEO" + if track.Kind() == webrtc.RTPCodecTypeAudio { + kind = "AUDIO" + } + tracks = append(tracks, map[string]any{ + "mid": transceiver.Mid(), + "transceiverMid": transceiver.Mid(), + "kind": kind, + "priority": 0, + "label": track.ID(), + "codecs": map[string]any{}, + "groupId": 1, + keyDescription: "", + }) + } + return tracks +} + +func isNonTURNURL(url string) bool { + return url != "" && !strings.HasPrefix(url, "turn:") && !strings.HasPrefix(url, "turns:") +} + +func parseICEURLs(server map[string]any) []string { + var urls []string + switch rawURLs := server["urls"].(type) { + case []any: + for _, rawURL := range rawURLs { + if url, ok := rawURL.(string); ok && isNonTURNURL(url) { + urls = append(urls, url) + } + } + case []string: + for _, url := range rawURLs { + if isNonTURNURL(url) { + urls = append(urls, url) + } + } + } + return urls +} + +func parseICEServer(rawServer any) (webrtc.ICEServer, bool) { + server, ok := rawServer.(map[string]any) + if !ok { + return webrtc.ICEServer{}, false + } + urls := parseICEURLs(server) + if len(urls) == 0 { + return webrtc.ICEServer{}, false + } + ice := webrtc.ICEServer{URLs: urls} + if username, ok := server["username"].(string); ok { + ice.Username = username + } + if credential, ok := server["credential"].(string); ok { + ice.Credential = credential + } + return ice, true +} + +func (s *Session) applyServerHelloConfig(serverHello map[string]any) { + rawCfg, ok := serverHello["rtcConfiguration"].(map[string]any) + if !ok { + return + } + rawServers, ok := rawCfg["iceServers"].([]any) + if !ok || len(rawServers) == 0 { + return + } + iceServers := make([]webrtc.ICEServer, 0, len(rawServers)) + for _, rawServer := range rawServers { + if ice, ok := parseICEServer(rawServer); ok { + iceServers = append(iceServers, ice) + } + } + if len(iceServers) == 0 { + return + } + cfg := webrtc.Configuration{ + ICEServers: iceServers, + SDPSemantics: webrtc.SDPSemanticsUnifiedPlan, + } + if s.pcSub != nil { + _ = s.pcSub.SetConfiguration(cfg) + } + if s.pcPub != nil { + _ = s.pcPub.SetConfiguration(cfg) + } +} diff --git a/internal/engine/goolom/session.go b/internal/engine/goolom/session.go new file mode 100644 index 0000000..64aa183 --- /dev/null +++ b/internal/engine/goolom/session.go @@ -0,0 +1,322 @@ +// Package goolom implements an engine.Session backed by the Goolom SFU +// signaling protocol. Goolom is the proprietary SFU developed for Yandex +// Telemost; the on-wire protocol — capabilities offer, separated subscriber +// and publisher PeerConnections, ack/pong keepalive, slots-based subscribe +// model — is what this engine speaks. +// +// HTTP auth (room-info lookup, telemetry referer, etc.) lives in the auth +// package; this engine consumes a media-server WebSocket URL plus the +// peer/room/credentials tuple supplied as engine.Config. +package goolom + +import ( + "context" + "errors" + "fmt" + "net/http" + "sync" + "sync/atomic" + "time" + + "github.com/gorilla/websocket" + "github.com/openlibrecommunity/olcrtc/internal/engine" + "github.com/pion/webrtc/v4" +) + +const ( + realDataChannelMessageLimit = 12288 + defaultSendDelayLow = 2 * time.Millisecond + defaultSendDelayMax = 12 * time.Millisecond + defaultTelemetryInterval = 20 * time.Second + defaultSendQueueSize = 5000 + defaultBufferHighWaterMark = 512 * 1024 + defaultSendQueueCapHard = 4000 + + wsReadTimeout = 60 * time.Second + wsHandshakeTimeout = 15 * time.Second + + keyUID = "uid" + keyDescription = "description" + keyPcSeq = "pcSeq" + keyName = "name" + stateTerminated = "terminated" + + credentialKeyRoomID = "roomID" + credentialKeyCredentials = "credentials" + credentialKeyRoomURL = "roomURL" + credentialKeyTelemetryReferer = "telemetryReferer" +) + +var ( + // ErrDataChannelTimeout is returned when the DataChannel fails to open in time. + ErrDataChannelTimeout = errors.New("datachannel timeout") + // ErrDataChannelNotReady is returned when send is called before the DataChannel is open. + ErrDataChannelNotReady = errors.New("datachannel not ready") + // ErrSendQueueClosed is returned when send is called after Close. + ErrSendQueueClosed = errors.New("send queue closed") + // ErrSendQueueTimeout is returned when the send queue cannot accept new data in time. + ErrSendQueueTimeout = errors.New("send queue timeout") + // ErrSessionClosed is returned when the session is closed mid-operation. + ErrSessionClosed = errors.New("session closed") + // ErrPeerClosed is returned when the peer is closed mid-operation. + ErrPeerClosed = errors.New("peer closed") + // ErrSubscriberMediaTimeout is returned when the subscriber media is not ready in time. + ErrSubscriberMediaTimeout = errors.New("subscriber media timeout") + // ErrPublisherNotInitialized is returned when the publisher PC is not set up. + ErrPublisherNotInitialized = errors.New("publisher peer connection not initialized") + // ErrURLRequired is returned when no media-server WebSocket URL was supplied. + ErrURLRequired = errors.New("goolom media server URL required") + // ErrRoomIDRequired is returned when no room ID was supplied. + ErrRoomIDRequired = errors.New("goolom room ID required") + // ErrPeerIDRequired is returned when no peer ID was supplied. + ErrPeerIDRequired = errors.New("goolom peer ID required") + // ErrNoRefresh is returned when reconnect is attempted without a refresh callback. + ErrNoRefresh = errors.New("goolom reconnect: no refresh callback supplied") +) + +// TrafficShape controls outgoing data-channel pacing. +type TrafficShape struct { + MaxMessageSize int + MinDelay time.Duration + MaxDelay time.Duration +} + +// Session is the Goolom engine handle. +type Session struct { + name string + mediaServerURL string + peerID string + roomID string + credentials string + roomURL string // referer for telemetry — opaque to the engine + telemetryReferer string + refresh func(ctx context.Context) (engine.Credentials, error) + + ws *websocket.Conn + wsMu sync.Mutex + pcSub *webrtc.PeerConnection + pcPub *webrtc.PeerConnection + dc *webrtc.DataChannel + + onData func([]byte) + onReconnect func(*webrtc.DataChannel) + shouldReconnect func() bool + onEnded func(string) + + reconnectCh chan struct{} + closeCh chan struct{} + keepAliveCh chan struct{} + telemetryCh chan struct{} + sessionCloseCh chan struct{} + lastReconnect time.Time + reconnectCount int + sessionMu sync.Mutex + + sendQueue chan []byte + sendQueueClosed atomic.Bool + closed atomic.Bool + reconnecting atomic.Bool + telemetryActive atomic.Bool + + ackMu sync.Mutex + ackWaiters map[string]chan struct{} + + trafficShape TrafficShape + + videoTrackMu sync.RWMutex + videoTracks []webrtc.TrackLocal + onVideoTrack func(*webrtc.TrackRemote, *webrtc.RTPReceiver) + subscriberReady atomic.Bool + publisherReady atomic.Bool + subscriberConn chan struct{} + publisherConn chan struct{} + wg sync.WaitGroup + + httpClient *http.Client +} + +// New creates a new Goolom engine session. +// +// cfg.URL is the media server WebSocket URL. cfg.Token carries the peer ID. +// cfg.Extra carries the rest of the room tuple: roomID, credentials, and an +// optional roomURL / telemetryReferer string the engine uses verbatim as the +// Referer header for telemetry posts. +func New(_ context.Context, cfg engine.Config) (engine.Session, error) { + if cfg.URL == "" { + return nil, ErrURLRequired + } + peerID := cfg.Token + if peerID == "" { + return nil, ErrPeerIDRequired + } + roomID := "" + credentials := "" + roomURL := "" + telemetryReferer := "" + if cfg.Extra != nil { + roomID = cfg.Extra[credentialKeyRoomID] + credentials = cfg.Extra[credentialKeyCredentials] + roomURL = cfg.Extra[credentialKeyRoomURL] + telemetryReferer = cfg.Extra[credentialKeyTelemetryReferer] + } + if roomID == "" { + return nil, ErrRoomIDRequired + } + if telemetryReferer == "" { + telemetryReferer = roomURL + } + + return &Session{ + name: cfg.Name, + mediaServerURL: cfg.URL, + peerID: peerID, + roomID: roomID, + credentials: credentials, + roomURL: roomURL, + telemetryReferer: telemetryReferer, + refresh: cfg.Refresh, + onData: cfg.OnData, + reconnectCh: make(chan struct{}, 1), + closeCh: make(chan struct{}), + keepAliveCh: make(chan struct{}), + sessionCloseCh: make(chan struct{}), + telemetryCh: make(chan struct{}, 1), + sendQueue: make(chan []byte, defaultSendQueueSize), + ackWaiters: make(map[string]chan struct{}), + subscriberConn: make(chan struct{}), + publisherConn: make(chan struct{}), + trafficShape: TrafficShape{ + MaxMessageSize: realDataChannelMessageLimit, + MinDelay: defaultSendDelayLow, + MaxDelay: defaultSendDelayMax, + }, + httpClient: nil, + }, nil +} + +// Capabilities reports what this engine can do. +func (s *Session) Capabilities() engine.Capabilities { + return engine.Capabilities{ByteStream: true, VideoTrack: true} +} + +// SetTrafficShape adjusts the outgoing data-channel pacing. +func (s *Session) SetTrafficShape(shape TrafficShape) { + if shape.MaxMessageSize <= 0 { + shape.MaxMessageSize = realDataChannelMessageLimit + } + if shape.MaxDelay < shape.MinDelay { + shape.MaxDelay = shape.MinDelay + } + s.trafficShape = shape +} + +// Send queues data for transmission. +func (s *Session) Send(data []byte) error { + if s.dc == nil || s.dc.ReadyState() != webrtc.DataChannelStateOpen { + return ErrDataChannelNotReady + } + if s.sendQueueClosed.Load() { + return ErrSendQueueClosed + } + select { + case s.sendQueue <- data: + return nil + case <-time.After(50 * time.Millisecond): + return ErrSendQueueTimeout + } +} + +// GetSendQueue returns the transmission queue. +func (s *Session) GetSendQueue() chan []byte { return s.sendQueue } + +// GetBufferedAmount returns the WebRTC buffered amount. +func (s *Session) GetBufferedAmount() uint64 { + if s.dc != nil { + return s.dc.BufferedAmount() + } + return 0 +} + +// SetEndedCallback sets the callback for connection termination. +func (s *Session) SetEndedCallback(cb func(string)) { s.onEnded = cb } + +// SetReconnectCallback sets the callback for reconnection events. +func (s *Session) SetReconnectCallback(cb func(*webrtc.DataChannel)) { s.onReconnect = cb } + +// SetShouldReconnect sets the policy for reconnection. +func (s *Session) SetShouldReconnect(fn func() bool) { s.shouldReconnect = fn } + +// CanSend checks if data can be sent. +func (s *Session) CanSend() bool { + if s.onData == nil { + if s.hasLocalVideoTracks() { + return !s.closed.Load() && s.subscriberReady.Load() && s.publisherReady.Load() + } + return !s.closed.Load() && s.subscriberReady.Load() + } + if s.dc == nil || s.dc.ReadyState() != webrtc.DataChannelStateOpen { + return false + } + return len(s.sendQueue) < defaultSendQueueCapHard +} + +// AddVideoTrack adds a video track to the publisher peer connection. +func (s *Session) AddVideoTrack(track webrtc.TrackLocal) error { + s.videoTrackMu.Lock() + s.videoTracks = append(s.videoTracks, track) + s.videoTrackMu.Unlock() + + if s.pcPub == nil { + return nil + } + if _, err := s.pcPub.AddTrack(track); err != nil { + return fmt.Errorf("failed to add track: %w", err) + } + return nil +} + +// SetVideoTrackHandler registers a callback for remote video tracks. +func (s *Session) SetVideoTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) { + s.videoTrackMu.Lock() + defer s.videoTrackMu.Unlock() + s.onVideoTrack = cb +} + +func (s *Session) hasLocalVideoTracks() bool { + s.videoTrackMu.RLock() + defer s.videoTrackMu.RUnlock() + return len(s.videoTracks) > 0 +} + +func (s *Session) videoTrackHandler() func(*webrtc.TrackRemote, *webrtc.RTPReceiver) { + s.videoTrackMu.RLock() + defer s.videoTrackMu.RUnlock() + return s.onVideoTrack +} + +func (s *Session) attachPendingVideoTracks() error { + s.videoTrackMu.RLock() + defer s.videoTrackMu.RUnlock() + + for _, track := range s.videoTracks { + if _, err := s.pcPub.AddTrack(track); err != nil { + return fmt.Errorf("add video track: %w", err) + } + } + return nil +} + +func closeSignal(ch chan struct{}) { + if ch == nil { + return + } + select { + case <-ch: + default: + close(ch) + } +} + +func init() { //nolint:gochecknoinits // engine registration is the canonical Go pattern for plugins + engine.Register("goolom", New) +} diff --git a/internal/engine/goolom/signaling.go b/internal/engine/goolom/signaling.go new file mode 100644 index 0000000..3608b98 --- /dev/null +++ b/internal/engine/goolom/signaling.go @@ -0,0 +1,303 @@ +package goolom + +import ( + "context" + "fmt" + "runtime" + "strings" + "time" + + "github.com/google/uuid" + "github.com/gorilla/websocket" + "github.com/openlibrecommunity/olcrtc/internal/logger" +) + +func (s *Session) sendHello() error { + hello := map[string]any{ + keyUID: uuid.New().String(), + "hello": map[string]any{ + "participantMeta": map[string]any{ + keyName: s.name, + "role": "SPEAKER", + keyDescription: "", + "sendAudio": false, + "sendVideo": s.hasLocalVideoTracks(), + }, + "participantAttributes": map[string]any{ + keyName: s.name, + "role": "SPEAKER", + keyDescription: "", + }, + "sendAudio": false, + "sendVideo": s.hasLocalVideoTracks(), + "sendSharing": false, + "participantId": s.peerID, + "roomId": s.roomID, + "serviceName": "telemost", + "credentials": s.credentials, + "capabilitiesOffer": goolomCapabilitiesOffer(), + "sdkInfo": map[string]any{ + "implementation": "browser", + "version": "5.27.0", + "userAgent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0", + "hwConcurrency": runtime.NumCPU(), + }, + "sdkInitializationId": uuid.New().String(), + "disablePublisher": !s.hasLocalVideoTracks(), + "disableSubscriber": false, + "disableSubscriberAudio": true, + }, + } + + s.wsMu.Lock() + defer s.wsMu.Unlock() + if err := s.ws.WriteJSON(hello); err != nil { + return fmt.Errorf("write hello: %w", err) + } + return nil +} + +func (s *Session) handleSignaling(ctx context.Context) { + pubSent := false + + for { + var msg map[string]any + if err := s.ws.ReadJSON(&msg); err != nil { + if !s.closed.Load() { + logger.Debugf("ws read error: %v", err) + s.queueReconnect() + } + return + } + + s.updateWSDeadline() + + uid, _ := msg[keyUID].(string) + s.handleMessageEvents(ctx, msg, uid) + + if isConferenceEndMessage(msg) { + s.signalEnded("conference ended") + return + } + + if offer, ok := msg["subscriberSdpOffer"].(map[string]any); ok { + if err := s.handleSdpOffer(offer, uid, !pubSent); err != nil { + logger.Debugf("sdp offer error: %v", err) + continue + } + pubSent = true + } + + s.handleSignalingResponses(msg, uid) + } +} + +func (s *Session) handleMessageEvents(ctx context.Context, msg map[string]any, uid string) { + if _, ok := msg["ack"]; ok { + s.resolveAck(uid) + } + + if serverHello, ok := msg["serverHello"].(map[string]any); ok { + s.applyServerHelloConfig(serverHello) + s.startTelemetry(ctx, serverHello) + s.sendAck(uid) + } + + s.handleCommonMessages(msg, uid) +} + +func (s *Session) handleSignalingResponses(msg map[string]any, uid string) { + if answer, ok := msg["publisherSdpAnswer"].(map[string]any); ok { + s.handleSdpAnswer(answer, uid) + } + if cand, ok := msg["webrtcIceCandidate"].(map[string]any); ok { + s.handleICE(cand) + } +} + +func (s *Session) updateWSDeadline() { + s.wsMu.Lock() + if s.ws != nil { + _ = s.ws.SetReadDeadline(time.Now().Add(wsReadTimeout)) + } + s.wsMu.Unlock() +} + +func (s *Session) handleCommonMessages(msg map[string]any, uid string) { + if _, ok := msg["updateDescription"]; ok { + s.sendAck(uid) + } + if _, ok := msg["vadActivity"]; ok { + s.sendAck(uid) + } + if _, ok := msg["ping"]; ok { + s.sendPong(uid) + } + if _, ok := msg["pong"]; ok { + s.sendAck(uid) + } +} + +func (s *Session) sendAck(uid string) { + if uid == "" { + return + } + s.wsMu.Lock() + defer s.wsMu.Unlock() + _ = s.ws.WriteJSON(map[string]any{ + keyUID: uid, + "ack": map[string]any{ + "status": map[string]any{"code": "OK"}, + }, + }) +} + +func (s *Session) sendPong(uid string) { + s.wsMu.Lock() + defer s.wsMu.Unlock() + _ = s.ws.WriteJSON(map[string]any{ + keyUID: uid, + "pong": map[string]any{}, + }) +} + +func (s *Session) registerAckWaiter(uid string) chan struct{} { + ch := make(chan struct{}) + s.ackMu.Lock() + s.ackWaiters[uid] = ch + s.ackMu.Unlock() + return ch +} + +func (s *Session) removeAckWaiter(uid string) { + s.ackMu.Lock() + delete(s.ackWaiters, uid) + s.ackMu.Unlock() +} + +func (s *Session) waitForAck(uid string, ch <-chan struct{}, timeout time.Duration) bool { + if uid == "" { + return false + } + defer s.removeAckWaiter(uid) + + select { + case <-ch: + return true + case <-time.After(timeout): + return false + case <-s.closeCh: + return false + } +} + +func (s *Session) resolveAck(uid string) { + if uid == "" { + return + } + s.ackMu.Lock() + ch := s.ackWaiters[uid] + if ch != nil { + delete(s.ackWaiters, uid) + close(ch) + } + s.ackMu.Unlock() +} + +func (s *Session) sendLeave(uid string) bool { + s.wsMu.Lock() + defer s.wsMu.Unlock() + + if s.ws == nil { + return false + } + leave := map[string]any{ + keyUID: uid, + "leave": map[string]any{}, + } + if err := s.ws.WriteJSON(leave); err != nil { + return false + } + return true +} + +func (s *Session) keepAlive(keepAliveCh <-chan struct{}) { + wsTicker := time.NewTicker(30 * time.Second) + defer wsTicker.Stop() + appTicker := time.NewTicker(5 * time.Second) + defer appTicker.Stop() + + for { + select { + case <-wsTicker.C: + if !s.sendWSPing() { + return + } + case <-appTicker.C: + if !s.sendAppPing() { + return + } + case <-keepAliveCh: + return + case <-s.closeCh: + return + } + } +} + +func (s *Session) sendWSPing() bool { + s.wsMu.Lock() + defer s.wsMu.Unlock() + if s.ws != nil { + if err := s.ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)); err != nil { + logger.Debugf("ws ping error: %v", err) + s.queueReconnect() + return false + } + } + return true +} + +func (s *Session) sendAppPing() bool { + s.wsMu.Lock() + defer s.wsMu.Unlock() + if s.ws != nil { + if err := s.ws.WriteJSON(map[string]any{ + keyUID: uuid.New().String(), + "ping": map[string]any{}, + }); err != nil { + logger.Debugf("app ping error: %v", err) + s.queueReconnect() + return false + } + } + return true +} + +func isConferenceEndMessage(msg map[string]any) bool { + for _, key := range []string{"conferenceClosed", "conferenceEnded", "roomClosed", "roomEnded", "callEnded"} { + if _, ok := msg[key]; ok { + return true + } + } + if raw, ok := msg["conference"].(map[string]any); ok { + if state, _ := raw["state"].(string); isEndedState(state) { + return true + } + } + if raw, ok := msg["conferenceState"].(map[string]any); ok { + if state, _ := raw["state"].(string); isEndedState(state) { + return true + } + } + return false +} + +func isEndedState(state string) bool { + switch strings.ToLower(state) { + case "closed", "ended", "finished", stateTerminated: + return true + default: + return false + } +} diff --git a/internal/engine/goolom/state.go b/internal/engine/goolom/state.go new file mode 100644 index 0000000..33e5022 --- /dev/null +++ b/internal/engine/goolom/state.go @@ -0,0 +1,246 @@ +package goolom + +import ( + "bytes" + "context" + "encoding/json" + "math/rand/v2" + "net/http" + "time" + + "github.com/google/uuid" + "github.com/openlibrecommunity/olcrtc/internal/logger" + "github.com/openlibrecommunity/olcrtc/internal/protect" +) + +func (s *Session) processSendQueue(workerID int, sessionCloseCh <-chan struct{}) { + for { + select { + case <-sessionCloseCh: + return + case <-s.closeCh: + return + case data := <-s.sendQueue: + if len(data) > s.trafficShape.MaxMessageSize { + logger.Debugf("oversized message size=%d limit=%d", len(data), s.trafficShape.MaxMessageSize) + continue + } + + waited, err := s.waitBufferedAmount(workerID, sessionCloseCh) + if err != nil { + return + } + if waited > 0 { + logger.Verbosef("[WORKER-%d] Drained after %v", workerID, waited) + } + + if err := s.dc.Send(data); err != nil { + logger.Debugf("send error: %v", err) + s.queueReconnect() + return + } + + if s.trafficShape.MinDelay > 0 { + time.Sleep(s.calculateDelay()) + } + } + } +} + +func (s *Session) waitBufferedAmount(workerID int, sessionCloseCh <-chan struct{}) (time.Duration, error) { + start := time.Now() + for s.dc.BufferedAmount() > defaultBufferHighWaterMark { + select { + case <-sessionCloseCh: + return 0, ErrSessionClosed + case <-s.closeCh: + return 0, ErrPeerClosed + case <-time.After(10 * time.Millisecond): + if time.Since(start) > 5*time.Second { + logger.Debugf("buffer wait timeout worker=%d", workerID) + return time.Since(start), nil + } + } + } + return time.Since(start), nil +} + +func (s *Session) calculateDelay() time.Duration { + minDelay := s.trafficShape.MinDelay + maxDelay := s.trafficShape.MaxDelay + if maxDelay <= minDelay { + return minDelay + } + return minDelay + time.Duration(rand.Int64N(int64(maxDelay-minDelay))) //nolint:gosec,lll // G404: non-cryptographic shaping randomness +} + +func (s *Session) startTelemetry(ctx context.Context, serverHello map[string]any) { + endpoint, interval, ok := parseTelemetryCfg(serverHello) + if !ok { + return + } + if !s.telemetryActive.CompareAndSwap(false, true) { + return + } + + s.wg.Add(1) + go func() { + defer s.wg.Done() + defer s.telemetryActive.Store(false) + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + s.sendTelemetry(ctx, endpoint, "join") + for { + select { + case <-ticker.C: + s.sendTelemetry(ctx, endpoint, "stats") + case <-s.telemetryCh: + s.sendTelemetry(ctx, endpoint, "leave") + return + case <-s.closeCh: + s.sendTelemetry(ctx, endpoint, "leave") + return + } + } + }() +} + +func parseTelemetryCfg(serverHello map[string]any) (string, time.Duration, bool) { + cfg, ok := serverHello["telemetryConfiguration"].(map[string]any) + if !ok { + return "", 0, false + } + endpoint, ok := cfg["logEndpoint"].(string) + if !ok || endpoint == "" { + endpoint, ok = cfg["endpoint"].(string) + if !ok || endpoint == "" { + endpoint, _ = cfg["url"].(string) + } + } + if endpoint == "" { + return "", 0, false + } + interval := defaultTelemetryInterval + if raw, ok := cfg["sendingInterval"].(float64); ok && raw > 0 { + interval = time.Duration(raw) * time.Millisecond + } + return endpoint, interval, true +} + +func (s *Session) stopTelemetry() { + if s.telemetryActive.Load() { + select { + case s.telemetryCh <- struct{}{}: + default: + } + } +} + +func (s *Session) sendTelemetry(ctx context.Context, endpoint, event string) { + body, err := json.Marshal(map[string]any{ + "event": event, + "timestamp": time.Now().UnixMilli(), + "peerId": s.peerID, + "roomId": s.roomID, + "displayName": s.name, + "implementation": "olcrtc-go", + "dataChannel": map[string]any{ + "bufferedAmount": s.GetBufferedAmount(), + "sendQueue": len(s.sendQueue), + }, + }) + if err != nil { + return + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) + if err != nil { + logger.Verbosef("Telemetry req error: %v", err) + return + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0") + if s.telemetryReferer != "" { + req.Header.Set("Referer", s.telemetryReferer) + } + req.Header.Set("X-Requested-With", "XMLHttpRequest") + req.Header.Set("Client-Instance-Id", uuid.New().String()) + req.Header.Set("X-Telemost-Client-Version", "187.1.0") + req.Header.Set("Idempotency-Key", uuid.New().String()) + + client := protect.NewHTTPClient() + resp, err := client.Do(req) + if err != nil { + logger.Verbosef("Telemetry send error: %v", err) + return + } + defer func() { _ = resp.Body.Close() }() +} + +func goolomCapabilitiesOffer() map[string]any { + return map[string]any{ + "offerAnswerMode": []string{"SEPARATE"}, + "initialSubscriberOffer": []string{"ON_HELLO"}, + "slotsMode": []string{"FROM_CONTROLLER"}, + "simulcastMode": []string{"DISABLED", "STATIC"}, + "selfVadStatus": []string{"FROM_SERVER", "FROM_CLIENT"}, + "dataChannelSharing": []string{"TO_RTP"}, + "videoEncoderConfig": []string{"NO_CONFIG", "ONLY_INIT_CONFIG", "RUNTIME_CONFIG"}, + "dataChannelVideoCodec": []string{"VP8", "UNIQUE_CODEC_FROM_TRACK_DESCRIPTION"}, + "bandwidthLimitationReason": []string{ + "BANDWIDTH_REASON_DISABLED", + "BANDWIDTH_REASON_ENABLED", + }, + "sdkDefaultDeviceManagement": []string{ + "SDK_DEFAULT_DEVICE_MANAGEMENT_DISABLED", + "SDK_DEFAULT_DEVICE_MANAGEMENT_ENABLED", + }, + "joinOrderLayout": []string{"JOIN_ORDER_LAYOUT_DISABLED", "JOIN_ORDER_LAYOUT_ENABLED"}, + "pinLayout": []string{"PIN_LAYOUT_DISABLED"}, + "sendSelfViewVideoSlot": []string{ + "SEND_SELF_VIEW_VIDEO_SLOT_DISABLED", + "SEND_SELF_VIEW_VIDEO_SLOT_ENABLED", + }, + "serverLayoutTransition": []string{"SERVER_LAYOUT_TRANSITION_DISABLED"}, + "sdkPublisherOptimizeBitrate": []string{ + "SDK_PUBLISHER_OPTIMIZE_BITRATE_DISABLED", + "SDK_PUBLISHER_OPTIMIZE_BITRATE_FULL", + "SDK_PUBLISHER_OPTIMIZE_BITRATE_ONLY_SELF", + }, + "sdkNetworkLostDetection": []string{"SDK_NETWORK_LOST_DETECTION_DISABLED"}, + "sdkNetworkPathMonitor": []string{"SDK_NETWORK_PATH_MONITOR_DISABLED"}, + "publisherVp9": []string{"PUBLISH_VP9_DISABLED", "PUBLISH_VP9_ENABLED"}, + "svcMode": []string{"SVC_MODE_DISABLED", "SVC_MODE_L3T3", "SVC_MODE_L3T3_KEY"}, + "subscriberOfferAsyncAck": []string{"SUBSCRIBER_OFFER_ASYNC_ACK_DISABLED", "SUBSCRIBER_OFFER_ASYNC_ACK_ENABLED"}, + "androidBluetoothRoutingFix": []string{ + "ANDROID_BLUETOOTH_ROUTING_FIX_DISABLED", + }, + "fixedIceCandidatesPoolSize": []string{ + "FIXED_ICE_CANDIDATES_POOL_SIZE_DISABLED", + }, + "sdkAndroidTelecomIntegration": []string{ + "SDK_ANDROID_TELECOM_INTEGRATION_DISABLED", + }, + "setActiveCodecsMode": []string{ + "SET_ACTIVE_CODECS_MODE_DISABLED", + "SET_ACTIVE_CODECS_MODE_VIDEO_ONLY", + }, + "subscriberDtlsPassiveMode": []string{ + "SUBSCRIBER_DTLS_PASSIVE_MODE_DISABLED", + }, + "publisherOpusDred": []string{ + "PUBLISHER_OPUS_DRED_DISABLED", + }, + "publisherOpusLowBitrate": []string{ + "PUBLISHER_OPUS_LOW_BITRATE_DISABLED", + }, + "sdkAndroidDestroySessionOnTaskRemoved": []string{ + "SDK_ANDROID_DESTROY_SESSION_ON_TASK_REMOVED_DISABLED", + }, + "svcModes": []string{"FALSE"}, + "reportTelemetryModes": []string{"TRUE"}, + "keepDefaultDevicesModes": []string{"FALSE"}, + } +} diff --git a/internal/provider/provider.go b/internal/provider/provider.go deleted file mode 100644 index 600a90d..0000000 --- a/internal/provider/provider.go +++ /dev/null @@ -1,50 +0,0 @@ -// Package provider defines the interface and registry for different WebRTC providers. -package provider - -import ( - "context" - "errors" - - "github.com/pion/webrtc/v4" -) - -var ( - // ErrDataChannelTimeout is returned when the DataChannel fails to open within the timeout period. - ErrDataChannelTimeout = errors.New("datachannel timeout") - // ErrDataChannelNotReady is returned when attempting to send data before the DataChannel is open. - ErrDataChannelNotReady = errors.New("datachannel not ready") - // ErrSendQueueClosed is returned when attempting to send data after the send queue has been closed. - ErrSendQueueClosed = errors.New("send queue closed") - // ErrSendQueueTimeout is returned when the send queue is full and the timeout is reached. - ErrSendQueueTimeout = errors.New("send queue timeout") -) - -// Provider defines the standard interface for WebRTC connection handlers. -type Provider interface { - Connect(ctx context.Context) error - Send(data []byte) error - Close() error - SetReconnectCallback(cb func(*webrtc.DataChannel)) - SetShouldReconnect(fn func() bool) - SetEndedCallback(cb func(string)) - WatchConnection(ctx context.Context) - CanSend() bool - GetSendQueue() chan []byte - GetBufferedAmount() uint64 -} - -// VideoTrackCapable is implemented by providers that can exchange video tracks. -type VideoTrackCapable interface { - AddVideoTrack(track webrtc.TrackLocal) error - SetVideoTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) -} - -// Config holds common configuration for all providers. -type Config struct { - RoomURL string - Name string - OnData func([]byte) - DNSServer string - ProxyAddr string - ProxyPort int -} diff --git a/internal/provider/telemost/api_test.go b/internal/provider/telemost/api_test.go deleted file mode 100644 index 1650e60..0000000 --- a/internal/provider/telemost/api_test.go +++ /dev/null @@ -1,83 +0,0 @@ -package telemost - -import ( - "context" - "encoding/json" - "errors" - "net/http" - "net/http/httptest" - "strings" - "testing" -) - -func withTelemostAPIServer(t *testing.T, h http.Handler) { - t.Helper() - old := apiBase - srv := httptest.NewServer(h) - t.Cleanup(func() { - apiBase = old - srv.Close() - }) - apiBase = srv.URL -} - -func TestGetConnectionInfo(t *testing.T) { - withTelemostAPIServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method != http.MethodGet { - t.Fatalf("method = %s", r.Method) - } - if !strings.Contains(r.URL.EscapedPath(), "/conferences/room%2Fid/connection") { - t.Fatalf("path = %q escaped=%q", r.URL.Path, r.URL.EscapedPath()) - } - if r.URL.Query().Get("display_name") != "peer" { - t.Fatalf("display_name query = %q", r.URL.Query().Get("display_name")) - } - _ = json.NewEncoder(w).Encode(ConnectionInfo{ - RoomID: "room", //nolint:goconst // test literal, repetition is intentional - PeerID: "peer-id", //nolint:goconst // test literal, repetition is intentional - Credentials: "creds", //nolint:goconst // test literal, repetition is intentional - }) - })) - - info, err := GetConnectionInfo(context.Background(), "room/id", "peer") - if err != nil { - t.Fatalf("GetConnectionInfo() error = %v", err) - } - if info.RoomID != "room" || info.PeerID != "peer-id" || info.Credentials != "creds" { - t.Fatalf("GetConnectionInfo() = %+v", info) - } -} - -func TestGetConnectionInfoErrors(t *testing.T) { - withTelemostAPIServer(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - http.Error(w, "bad", http.StatusForbidden) - })) - if _, err := GetConnectionInfo(context.Background(), "room", "peer"); !errors.Is(err, ErrAPI) { - t.Fatalf("GetConnectionInfo() error = %v, want %v", err, ErrAPI) - } - - withTelemostAPIServer(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - _, _ = w.Write([]byte("{")) - })) - if _, err := GetConnectionInfo(context.Background(), "room", "peer"); err == nil { - t.Fatal("GetConnectionInfo() unexpectedly accepted bad json") - } -} - -func TestTelemostNewPeerUsesConnectionInfo(t *testing.T) { - withTelemostAPIServer(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - _ = json.NewEncoder(w).Encode(ConnectionInfo{ - RoomID: "room", - PeerID: "peer-id", - Credentials: "creds", - }) - })) - - p, err := NewPeer(context.Background(), "room", "name", nil) - if err != nil { - t.Fatalf("NewPeer() error = %v", err) - } - if p.roomURL != "room" || p.name != "name" || p.conn.PeerID != "peer-id" || p.sendQueue == nil { - t.Fatalf("NewPeer() = %+v", p) - } -} diff --git a/internal/provider/telemost/peer.go b/internal/provider/telemost/peer.go deleted file mode 100644 index 0a16591..0000000 --- a/internal/provider/telemost/peer.go +++ /dev/null @@ -1,1514 +0,0 @@ -// Package telemost implements the Yandex Telemost WebRTC provider. -package telemost - -import ( - "bytes" - "context" - "encoding/json" - "errors" - "fmt" - "math/rand/v2" - "net/http" - "runtime" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/google/uuid" - "github.com/gorilla/websocket" - "github.com/openlibrecommunity/olcrtc/internal/logger" - "github.com/openlibrecommunity/olcrtc/internal/protect" - "github.com/pion/webrtc/v4" -) - -const ( - realDataChannelMessageLimit = 12288 - defaultSendDelayLow = 2 * time.Millisecond - defaultSendDelayMax = 12 * time.Millisecond - defaultTelemetryInterval = 20 * time.Second - - keyUID = "uid" - keyDescription = "description" - keyPcSeq = "pcSeq" - keyName = "name" - stateTerminated = "terminated" -) - -var ( - // ErrDataChannelTimeout is returned when the DataChannel fails to open in time. - ErrDataChannelTimeout = errors.New("datachannel timeout") - // ErrDataChannelNotReady is returned when attempting to send data before the DataChannel is open. - ErrDataChannelNotReady = errors.New("datachannel not ready") - // ErrSendQueueClosed is returned when attempting to send data after the send queue has been closed. - ErrSendQueueClosed = errors.New("send queue closed") - // ErrSendQueueTimeout is returned when the send queue is full and the timeout is reached. - ErrSendQueueTimeout = errors.New("send queue timeout") - // ErrSessionClosed is returned when the session is closed. - ErrSessionClosed = errors.New("session closed") - // ErrPeerClosed is returned when the peer is closed. - ErrPeerClosed = errors.New("peer closed") - // ErrSubscriberMediaTimeout is returned when subscriber media is not ready within the timeout period. - ErrSubscriberMediaTimeout = errors.New("subscriber media timeout") -) - -// TrafficShape defines the parameters for outgoing traffic control. -type TrafficShape struct { - MaxMessageSize int - MinDelay time.Duration - MaxDelay time.Duration -} - -// Peer represents a Yandex Telemost WebRTC connection. -type Peer struct { - roomURL string - name string - conn *ConnectionInfo - ws *websocket.Conn - wsMu sync.Mutex - pcSub *webrtc.PeerConnection - pcPub *webrtc.PeerConnection - dc *webrtc.DataChannel - onData func([]byte) - onReconnect func(*webrtc.DataChannel) - shouldReconnect func() bool - reconnectCh chan struct{} - closeCh chan struct{} - keepAliveCh chan struct{} - telemetryCh chan struct{} - lastReconnect time.Time - reconnectCount int - sessionMu sync.Mutex - sendQueue chan []byte - sendQueueClosed atomic.Bool - closed atomic.Bool - reconnecting atomic.Bool - telemetryActive atomic.Bool - ackMu sync.Mutex - ackWaiters map[string]chan struct{} - onEnded func(string) - trafficShape TrafficShape - sessionCloseCh chan struct{} - videoTrackMu sync.RWMutex - videoTracks []webrtc.TrackLocal - onVideoTrack func(*webrtc.TrackRemote, *webrtc.RTPReceiver) - subscriberReady atomic.Bool - publisherReady atomic.Bool - subscriberConn chan struct{} - publisherConn chan struct{} - wg sync.WaitGroup -} - -// GetSendQueue returns the transmission queue. -func (p *Peer) GetSendQueue() chan []byte { - return p.sendQueue -} - -// GetBufferedAmount returns the WebRTC buffered amount. -func (p *Peer) GetBufferedAmount() uint64 { - if p.dc != nil { - return p.dc.BufferedAmount() - } - return 0 -} - -// SetEndedCallback sets the callback for connection termination. -func (p *Peer) SetEndedCallback(cb func(string)) { - p.onEnded = cb -} - -// SetTrafficShape configures the traffic control parameters. -func (p *Peer) SetTrafficShape(shape TrafficShape) { - if shape.MaxMessageSize <= 0 { - shape.MaxMessageSize = realDataChannelMessageLimit - } - if shape.MaxDelay < shape.MinDelay { - shape.MaxDelay = shape.MinDelay - } - p.trafficShape = shape -} - -// NewPeer creates a new Telemost provider peer. -func NewPeer(ctx context.Context, roomURL, name string, onData func([]byte)) (*Peer, error) { - conn, err := GetConnectionInfo(ctx, roomURL, name) - if err != nil { - return nil, fmt.Errorf("failed to get connection info: %w", err) - } - - return &Peer{ - roomURL: roomURL, - name: name, - conn: conn, - onData: onData, - reconnectCh: make(chan struct{}, 1), - closeCh: make(chan struct{}), - keepAliveCh: make(chan struct{}), - sessionCloseCh: make(chan struct{}), - telemetryCh: make(chan struct{}, 1), - sendQueue: make(chan []byte, 5000), - ackWaiters: make(map[string]chan struct{}), - subscriberConn: make(chan struct{}), - publisherConn: make(chan struct{}), - trafficShape: TrafficShape{ - MaxMessageSize: realDataChannelMessageLimit, - MinDelay: defaultSendDelayLow, - MaxDelay: defaultSendDelayMax, - }, - }, nil -} - -func closeSignal(ch chan struct{}) { - if ch == nil { - return - } - select { - case <-ch: - default: - close(ch) - } -} - -func (p *Peer) queueReconnect() { - if p.closed.Load() || p.reconnecting.Load() { - return - } - if p.shouldReconnect != nil && !p.shouldReconnect() { - return - } - select { - case p.reconnectCh <- struct{}{}: - default: - } -} - -func (p *Peer) stopSession() { - p.stopTelemetry() - - p.sessionMu.Lock() - closeSignal(p.keepAliveCh) - closeSignal(p.sessionCloseCh) - p.sessionMu.Unlock() -} - -func (p *Peer) resetSession() (chan struct{}, chan struct{}) { - p.sessionMu.Lock() - defer p.sessionMu.Unlock() - - p.keepAliveCh = make(chan struct{}) - p.sessionCloseCh = make(chan struct{}) - return p.keepAliveCh, p.sessionCloseCh -} - -func (p *Peer) resetMediaState() { - p.subscriberReady.Store(false) - p.publisherReady.Store(false) - p.subscriberConn = make(chan struct{}) - p.publisherConn = make(chan struct{}) -} - -func (p *Peer) hasLocalVideoTracks() bool { - p.videoTrackMu.RLock() - defer p.videoTrackMu.RUnlock() - return len(p.videoTracks) > 0 -} - -func (p *Peer) videoTrackHandler() func(*webrtc.TrackRemote, *webrtc.RTPReceiver) { - p.videoTrackMu.RLock() - defer p.videoTrackMu.RUnlock() - return p.onVideoTrack -} - -func (p *Peer) attachPendingVideoTracks() error { - p.videoTrackMu.RLock() - defer p.videoTrackMu.RUnlock() - - for _, track := range p.videoTracks { - if _, err := p.pcPub.AddTrack(track); err != nil { - return fmt.Errorf("add video track: %w", err) - } - } - - return nil -} - -func (p *Peer) drainReconnectQueue() { - for { - select { - case <-p.reconnectCh: - default: - return - } - } -} - -// Connect starts the WebRTC connection process. -func (p *Peer) Connect(ctx context.Context) error { - p.closed.Store(false) - p.resetMediaState() - - config := webrtc.Configuration{ - ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.rtc.yandex.net:3478"}}}, - SDPSemantics: webrtc.SDPSemanticsUnifiedPlan, - } - - if err := p.setupPeerConnections(config); err != nil { - return err - } - - keepAliveCh, sessionCloseCh := p.resetSession() - var dcReady chan struct{} - if p.onData != nil { - var err error - p.dc, err = p.pcPub.CreateDataChannel("olcrtc", nil) - if err != nil { - return fmt.Errorf("create dc: %w", err) - } - - dcReady = make(chan struct{}) - p.setupDataChannelHandlers(dcReady, sessionCloseCh) - } - - if err := p.dialWebSocket(); err != nil { - return err - } - - p.setupICEHandlers() - p.startBackgroundGoroutines(ctx, keepAliveCh) - - if p.onData != nil { - select { - case <-dcReady: - return nil - case <-time.After(15 * time.Second): - return ErrDataChannelTimeout - case <-ctx.Done(): - return fmt.Errorf("connect context cancelled: %w", ctx.Err()) - } - } - - return p.waitForMediaReady(ctx, 20*time.Second) -} - -func (p *Peer) waitForMediaReady(ctx context.Context, timeout time.Duration) error { - timer := time.NewTimer(timeout) - defer timer.Stop() - - select { - case <-p.subscriberConn: - case <-timer.C: - return ErrSubscriberMediaTimeout - case <-ctx.Done(): - return fmt.Errorf("connect context cancelled: %w", ctx.Err()) - } - - return nil -} - -func (p *Peer) setupPeerConnections(config webrtc.Configuration) error { - settingEngine := webrtc.SettingEngine{} - if protect.Protector != nil { - settingEngine.SetICEProxyDialer(protect.NewProxyDialer()) - } - api := webrtc.NewAPI(webrtc.WithSettingEngine(settingEngine)) - - var err error - p.pcSub, err = api.NewPeerConnection(config) - if err != nil { - return fmt.Errorf("new sub pc: %w", err) - } - p.pcSub.OnConnectionStateChange(p.onSubscriberConnectionStateChange) - p.pcSub.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { - if track.Kind() != webrtc.RTPCodecTypeVideo { - return - } - - logger.Infof("telemost remote video track: codec=%s stream=%s track=%s", - track.Codec().MimeType, track.StreamID(), track.ID()) - - if cb := p.videoTrackHandler(); cb != nil { - cb(track, receiver) - } - }) - - p.pcPub, err = api.NewPeerConnection(config) - if err != nil { - return fmt.Errorf("new pub pc: %w", err) - } - p.pcPub.OnConnectionStateChange(p.onPublisherConnectionStateChange) - - if err := p.attachPendingVideoTracks(); err != nil { - return err - } - - return nil -} - -func (p *Peer) onConnectionStateChange(state webrtc.PeerConnectionState) { - if !p.closed.Load() && state == webrtc.PeerConnectionStateFailed { - p.queueReconnect() - } -} - -func (p *Peer) onSubscriberConnectionStateChange(state webrtc.PeerConnectionState) { - logger.Debugf("telemost subscriber state: %s", state.String()) - switch state { - case webrtc.PeerConnectionStateConnected: - p.subscriberReady.Store(true) - closeSignal(p.subscriberConn) - case webrtc.PeerConnectionStateDisconnected, - webrtc.PeerConnectionStateFailed, - webrtc.PeerConnectionStateClosed: - p.subscriberReady.Store(false) - case webrtc.PeerConnectionStateUnknown, - webrtc.PeerConnectionStateNew, - webrtc.PeerConnectionStateConnecting: - } - p.onConnectionStateChange(state) -} - -func (p *Peer) onPublisherConnectionStateChange(state webrtc.PeerConnectionState) { - logger.Debugf("telemost publisher state: %s", state.String()) - switch state { - case webrtc.PeerConnectionStateConnected: - p.publisherReady.Store(true) - closeSignal(p.publisherConn) - case webrtc.PeerConnectionStateDisconnected, - webrtc.PeerConnectionStateFailed, - webrtc.PeerConnectionStateClosed: - p.publisherReady.Store(false) - case webrtc.PeerConnectionStateUnknown, - webrtc.PeerConnectionStateNew, - webrtc.PeerConnectionStateConnecting: - } - p.onConnectionStateChange(state) -} - -func (p *Peer) setupDataChannelHandlers(dcReady chan struct{}, sessionCloseCh chan struct{}) { - p.dc.OnOpen(func() { - numWorkers := 4 - for i := range numWorkers { - p.wg.Add(1) - go func(workerID int) { - defer p.wg.Done() - p.processSendQueue(workerID, sessionCloseCh) - }(i) - } - close(dcReady) - }) - - p.dc.OnClose(p.onDataChannelClose) - p.dc.OnMessage(p.onDataChannelMessage) - - p.pcSub.OnDataChannel(func(dc *webrtc.DataChannel) { - if p.onData != nil { - dc.OnMessage(p.onDataChannelMessage) - } - }) -} - -func (p *Peer) onDataChannelClose() { - if !p.closed.Load() { - p.queueReconnect() - } -} - -func (p *Peer) onDataChannelMessage(msg webrtc.DataChannelMessage) { - if p.onData != nil && len(msg.Data) > 0 { - p.onData(msg.Data) - } -} - -func (p *Peer) dialWebSocket() error { - wsDialer := websocket.Dialer{ - NetDialContext: protect.DialContext, - HandshakeTimeout: 15 * time.Second, - } - ws, resp, err := wsDialer.Dial(p.conn.ClientConfig.MediaServerURL, nil) - if err != nil { - return fmt.Errorf("dial ws: %w", err) - } - if resp != nil && resp.Body != nil { - _ = resp.Body.Close() - } - p.ws = ws - - ws.SetPongHandler(func(string) error { - _ = ws.SetReadDeadline(time.Now().Add(60 * time.Second)) - return nil - }) - _ = ws.SetReadDeadline(time.Now().Add(60 * time.Second)) - return nil -} - -func (p *Peer) startBackgroundGoroutines(ctx context.Context, keepAliveCh chan struct{}) { - p.wg.Add(1) - go func() { - defer p.wg.Done() - p.keepAlive(keepAliveCh) - }() - - _ = p.sendHello() - - p.wg.Add(1) - go func() { - defer p.wg.Done() - p.handleSignaling(ctx) - }() -} - -// Send queues data for transmission. -func (p *Peer) Send(data []byte) error { - if p.dc == nil || p.dc.ReadyState() != webrtc.DataChannelStateOpen { - return ErrDataChannelNotReady - } - - if p.sendQueueClosed.Load() { - return ErrSendQueueClosed - } - - select { - case p.sendQueue <- data: - return nil - case <-time.After(50 * time.Millisecond): - return ErrSendQueueTimeout - } -} - -func (p *Peer) sendHello() error { - hello := map[string]interface{}{ - keyUID: uuid.New().String(), - "hello": map[string]interface{}{ - "participantMeta": map[string]interface{}{ - keyName: p.name, - "role": "SPEAKER", - keyDescription: "", - "sendAudio": false, - "sendVideo": p.hasLocalVideoTracks(), - }, - "participantAttributes": map[string]interface{}{ - keyName: p.name, - "role": "SPEAKER", - keyDescription: "", - }, - "sendAudio": false, - "sendVideo": p.hasLocalVideoTracks(), - "sendSharing": false, - "participantId": p.conn.PeerID, - "roomId": p.conn.RoomID, - "serviceName": "telemost", - "credentials": p.conn.Credentials, - "capabilitiesOffer": telemostCapabilitiesOffer(), - "sdkInfo": map[string]interface{}{ - "implementation": "browser", - "version": "5.27.0", - "userAgent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0", - "hwConcurrency": runtime.NumCPU(), - }, - "sdkInitializationId": uuid.New().String(), - "disablePublisher": !p.hasLocalVideoTracks(), - "disableSubscriber": false, - "disableSubscriberAudio": true, - }, - } - - p.wsMu.Lock() - defer p.wsMu.Unlock() - if err := p.ws.WriteJSON(hello); err != nil { - return fmt.Errorf("write hello: %w", err) - } - return nil -} - -func (p *Peer) handleSignaling(ctx context.Context) { - pubSent := false - - for { - var msg map[string]interface{} - if err := p.ws.ReadJSON(&msg); err != nil { - if !p.closed.Load() { - logger.Debugf("ws read error: %v", err) - p.queueReconnect() - } - return - } - - p.updateWSDeadline() - - uid, _ := msg[keyUID].(string) - p.handleMessageEvents(ctx, msg, uid) - - if isConferenceEndMessage(msg) { - p.signalEnded("conference ended") - return - } - - if offer, ok := msg["subscriberSdpOffer"].(map[string]interface{}); ok { - if err := p.handleSdpOffer(offer, uid, !pubSent); err != nil { - logger.Debugf("sdp offer error: %v", err) - continue - } - pubSent = true - } - - p.handleSignalingResponses(msg, uid) - } -} - -func (p *Peer) handleMessageEvents(ctx context.Context, msg map[string]interface{}, uid string) { - if _, ok := msg["ack"]; ok { - p.resolveAck(uid) - } - - if serverHello, ok := msg["serverHello"].(map[string]interface{}); ok { - p.applyServerHelloConfig(serverHello) - p.startTelemetry(ctx, serverHello) - p.sendAck(uid) - } - - p.handleCommonMessages(msg, uid) -} - -func (p *Peer) handleSignalingResponses(msg map[string]interface{}, uid string) { - if answer, ok := msg["publisherSdpAnswer"].(map[string]interface{}); ok { - p.handleSdpAnswer(answer, uid) - } - - if cand, ok := msg["webrtcIceCandidate"].(map[string]interface{}); ok { - p.handleICE(cand) - } -} - -func (p *Peer) updateWSDeadline() { - p.wsMu.Lock() - if p.ws != nil { - _ = p.ws.SetReadDeadline(time.Now().Add(60 * time.Second)) - } - p.wsMu.Unlock() -} - -func (p *Peer) handleCommonMessages(msg map[string]interface{}, uid string) { - if _, ok := msg["updateDescription"]; ok { - p.sendAck(uid) - } - if _, ok := msg["vadActivity"]; ok { - p.sendAck(uid) - } - if _, ok := msg["ping"]; ok { - p.sendPong(uid) - } - if _, ok := msg["pong"]; ok { - p.sendAck(uid) - } -} - -func (p *Peer) handleSdpOffer(offer map[string]interface{}, uid string, sendPub bool) error { - sdp, _ := offer["sdp"].(string) - pcSeq, _ := offer["pcSeq"].(float64) - - if err := p.pcSub.SetRemoteDescription(webrtc.SessionDescription{ - Type: webrtc.SDPTypeOffer, - SDP: sdp, - }); err != nil { - return fmt.Errorf("set remote desc: %w", err) - } - - answer, err := p.pcSub.CreateAnswer(nil) - if err != nil { - return fmt.Errorf("create answer: %w", err) - } - - if err := p.pcSub.SetLocalDescription(answer); err != nil { - return fmt.Errorf("set local desc: %w", err) - } - - p.wsMu.Lock() - _ = p.ws.WriteJSON(map[string]interface{}{ - keyUID: uuid.New().String(), - "subscriberSdpAnswer": map[string]interface{}{ - keyPcSeq: int(pcSeq), - "sdp": answer.SDP, - }, - }) - p.wsMu.Unlock() - - p.sendAck(uid) - - if p.onData == nil { - if err := p.sendSetSlots(); err != nil { - logger.Debugf("setSlots error: %v", err) - } - } - - if !sendPub { - return nil - } - - time.Sleep(300 * time.Millisecond) - - pubOffer, err := p.pcPub.CreateOffer(nil) - if err != nil { - return fmt.Errorf("create pub offer: %w", err) - } - - if err := p.pcPub.SetLocalDescription(pubOffer); err != nil { - return fmt.Errorf("set local pub desc: %w", err) - } - - p.wsMu.Lock() - _ = p.ws.WriteJSON(map[string]interface{}{ - keyUID: uuid.New().String(), - "publisherSdpOffer": map[string]interface{}{ - keyPcSeq: 1, - "sdp": pubOffer.SDP, - "tracks": p.publisherTrackDescriptions(), - }, - }) - p.wsMu.Unlock() - return nil -} - -func (p *Peer) sendSetSlots() error { - p.wsMu.Lock() - defer p.wsMu.Unlock() - - // Telemost only forwards as many remote videos as the subscriber asks for - // via setSlots. Two slots are enough for a single pair, but once multiple - // olcrtc peers share one room the later publishers may never be subscribed - // at all, which makes their vp8channel session appear "silent". Request a - // generous number of slots so each subscriber can receive every active - // publisher in the room. - slots := make([]map[string]int, 0, 8) - for range 8 { - slots = append(slots, map[string]int{"width": 1280, "height": 720}) - } - - if err := p.ws.WriteJSON(map[string]interface{}{ - keyUID: uuid.New().String(), - "setSlots": map[string]interface{}{ - "slots": slots, - "audioSlotsCount": 0, - "key": 1, - "shutdownAllVideo": nil, - "withSelfView": false, - "selfViewVisibility": "ON_LOADING_THEN_SHOW", - "gridConfig": map[string]interface{}{}, - }, - }); err != nil { - return fmt.Errorf("write set slots: %w", err) - } - return nil -} - -func isNonTURNURL(url string) bool { - return url != "" && !strings.HasPrefix(url, "turn:") && !strings.HasPrefix(url, "turns:") -} - -func parseICEURLs(server map[string]interface{}) []string { - var urls []string - switch rawURLs := server["urls"].(type) { - case []interface{}: - for _, rawURL := range rawURLs { - if url, ok := rawURL.(string); ok && isNonTURNURL(url) { - urls = append(urls, url) - } - } - case []string: - for _, url := range rawURLs { - if isNonTURNURL(url) { - urls = append(urls, url) - } - } - } - return urls -} - -func parseICEServer(rawServer interface{}) (webrtc.ICEServer, bool) { - server, ok := rawServer.(map[string]interface{}) - if !ok { - return webrtc.ICEServer{}, false - } - urls := parseICEURLs(server) - if len(urls) == 0 { - return webrtc.ICEServer{}, false - } - ice := webrtc.ICEServer{URLs: urls} - if username, ok := server["username"].(string); ok { - ice.Username = username - } - if credential, ok := server["credential"].(string); ok { - ice.Credential = credential - } - return ice, true -} - -func (p *Peer) applyServerHelloConfig(serverHello map[string]interface{}) { - rawCfg, ok := serverHello["rtcConfiguration"].(map[string]interface{}) - if !ok { - return - } - - rawServers, ok := rawCfg["iceServers"].([]interface{}) - if !ok || len(rawServers) == 0 { - return - } - - iceServers := make([]webrtc.ICEServer, 0, len(rawServers)) - for _, rawServer := range rawServers { - if ice, ok := parseICEServer(rawServer); ok { - iceServers = append(iceServers, ice) - } - } - - if len(iceServers) == 0 { - return - } - - cfg := webrtc.Configuration{ - ICEServers: iceServers, - SDPSemantics: webrtc.SDPSemanticsUnifiedPlan, - } - - if p.pcSub != nil { - _ = p.pcSub.SetConfiguration(cfg) - } - if p.pcPub != nil { - _ = p.pcPub.SetConfiguration(cfg) - } -} - -func (p *Peer) publisherTrackDescriptions() []map[string]interface{} { - if p.pcPub == nil { - return nil - } - - tracks := make([]map[string]interface{}, 0) - for _, transceiver := range p.pcPub.GetTransceivers() { - sender := transceiver.Sender() - if sender == nil { - continue - } - - track := sender.Track() - if track == nil { - continue - } - - kind := "VIDEO" - if track.Kind() == webrtc.RTPCodecTypeAudio { - kind = "AUDIO" - } - - tracks = append(tracks, map[string]interface{}{ - "mid": transceiver.Mid(), - "transceiverMid": transceiver.Mid(), - "kind": kind, - "priority": 0, - "label": track.ID(), - "codecs": map[string]interface{}{}, - "groupId": 1, - keyDescription: "", - }) - } - - return tracks -} - -func telemostCapabilitiesOffer() map[string]interface{} { - return map[string]interface{}{ - "offerAnswerMode": []string{"SEPARATE"}, - "initialSubscriberOffer": []string{"ON_HELLO"}, - "slotsMode": []string{"FROM_CONTROLLER"}, - "simulcastMode": []string{"DISABLED", "STATIC"}, - "selfVadStatus": []string{"FROM_SERVER", "FROM_CLIENT"}, - "dataChannelSharing": []string{"TO_RTP"}, - "videoEncoderConfig": []string{"NO_CONFIG", "ONLY_INIT_CONFIG", "RUNTIME_CONFIG"}, - "dataChannelVideoCodec": []string{"VP8", "UNIQUE_CODEC_FROM_TRACK_DESCRIPTION"}, - "bandwidthLimitationReason": []string{ - "BANDWIDTH_REASON_DISABLED", - "BANDWIDTH_REASON_ENABLED", - }, - "sdkDefaultDeviceManagement": []string{ - "SDK_DEFAULT_DEVICE_MANAGEMENT_DISABLED", - "SDK_DEFAULT_DEVICE_MANAGEMENT_ENABLED", - }, - "joinOrderLayout": []string{"JOIN_ORDER_LAYOUT_DISABLED", "JOIN_ORDER_LAYOUT_ENABLED"}, - "pinLayout": []string{"PIN_LAYOUT_DISABLED"}, - "sendSelfViewVideoSlot": []string{ - "SEND_SELF_VIEW_VIDEO_SLOT_DISABLED", - "SEND_SELF_VIEW_VIDEO_SLOT_ENABLED", - }, - "serverLayoutTransition": []string{"SERVER_LAYOUT_TRANSITION_DISABLED"}, - "sdkPublisherOptimizeBitrate": []string{ - "SDK_PUBLISHER_OPTIMIZE_BITRATE_DISABLED", - "SDK_PUBLISHER_OPTIMIZE_BITRATE_FULL", - "SDK_PUBLISHER_OPTIMIZE_BITRATE_ONLY_SELF", - }, - "sdkNetworkLostDetection": []string{"SDK_NETWORK_LOST_DETECTION_DISABLED"}, - "sdkNetworkPathMonitor": []string{"SDK_NETWORK_PATH_MONITOR_DISABLED"}, - "publisherVp9": []string{"PUBLISH_VP9_DISABLED", "PUBLISH_VP9_ENABLED"}, - "svcMode": []string{"SVC_MODE_DISABLED", "SVC_MODE_L3T3", "SVC_MODE_L3T3_KEY"}, - "subscriberOfferAsyncAck": []string{"SUBSCRIBER_OFFER_ASYNC_ACK_DISABLED", "SUBSCRIBER_OFFER_ASYNC_ACK_ENABLED"}, - "androidBluetoothRoutingFix": []string{ - "ANDROID_BLUETOOTH_ROUTING_FIX_DISABLED", - }, - "fixedIceCandidatesPoolSize": []string{ - "FIXED_ICE_CANDIDATES_POOL_SIZE_DISABLED", - }, - "sdkAndroidTelecomIntegration": []string{ - "SDK_ANDROID_TELECOM_INTEGRATION_DISABLED", - }, - "setActiveCodecsMode": []string{ - "SET_ACTIVE_CODECS_MODE_DISABLED", - "SET_ACTIVE_CODECS_MODE_VIDEO_ONLY", - }, - "subscriberDtlsPassiveMode": []string{ - "SUBSCRIBER_DTLS_PASSIVE_MODE_DISABLED", - }, - "publisherOpusDred": []string{ - "PUBLISHER_OPUS_DRED_DISABLED", - }, - "publisherOpusLowBitrate": []string{ - "PUBLISHER_OPUS_LOW_BITRATE_DISABLED", - }, - "sdkAndroidDestroySessionOnTaskRemoved": []string{ - "SDK_ANDROID_DESTROY_SESSION_ON_TASK_REMOVED_DISABLED", - }, - "svcModes": []string{"FALSE"}, - "reportTelemetryModes": []string{"TRUE"}, - "keepDefaultDevicesModes": []string{"FALSE"}, - } -} - -func (p *Peer) handleSdpAnswer(answer map[string]interface{}, uid string) { - sdp, _ := answer["sdp"].(string) - if err := p.pcPub.SetRemoteDescription(webrtc.SessionDescription{ - Type: webrtc.SDPTypeAnswer, - SDP: sdp, - }); err != nil { - logger.Debugf("SetRemoteDescription error: %v", err) - } - p.sendAck(uid) -} - -func (p *Peer) handleICE(cand map[string]interface{}) { - candStr, _ := cand["candidate"].(string) - target, _ := cand["target"].(string) - sdpMid, _ := cand["sdpMid"].(string) - sdpMLineIndex, _ := cand["sdpMlineIndex"].(float64) - - parts := strings.Fields(candStr) - if len(parts) < 8 { - return - } - - init := webrtc.ICECandidateInit{ - Candidate: candStr, - SDPMid: &sdpMid, - SDPMLineIndex: func() *uint16 { v := uint16(sdpMLineIndex); return &v }(), - } - - switch target { - case "SUBSCRIBER": - _ = p.pcSub.AddICECandidate(init) - case "PUBLISHER": - _ = p.pcPub.AddICECandidate(init) - } -} - -func (p *Peer) sendAck(uid string) { - if uid == "" { - return - } - - p.wsMu.Lock() - defer p.wsMu.Unlock() - - _ = p.ws.WriteJSON(map[string]interface{}{ - keyUID: uid, - "ack": map[string]interface{}{ - "status": map[string]interface{}{"code": "OK"}, - }, - }) -} - -func (p *Peer) registerAckWaiter(uid string) chan struct{} { - ch := make(chan struct{}) - p.ackMu.Lock() - p.ackWaiters[uid] = ch - p.ackMu.Unlock() - return ch -} - -func (p *Peer) removeAckWaiter(uid string) { - p.ackMu.Lock() - delete(p.ackWaiters, uid) - p.ackMu.Unlock() -} - -func (p *Peer) waitForAck(uid string, ch <-chan struct{}, timeout time.Duration) bool { - if uid == "" { - return false - } - - defer p.removeAckWaiter(uid) - - select { - case <-ch: - return true - case <-time.After(timeout): - return false - case <-p.closeCh: - return false - } -} - -func (p *Peer) resolveAck(uid string) { - if uid == "" { - return - } - - p.ackMu.Lock() - ch := p.ackWaiters[uid] - if ch != nil { - delete(p.ackWaiters, uid) - close(ch) - } - p.ackMu.Unlock() -} - -func (p *Peer) sendPong(uid string) { - p.wsMu.Lock() - defer p.wsMu.Unlock() - - _ = p.ws.WriteJSON(map[string]interface{}{ - keyUID: uid, - "pong": map[string]interface{}{}, - }) -} - -func (p *Peer) startTelemetry(ctx context.Context, serverHello map[string]interface{}) { - endpoint, interval, ok := parseTelemetryCfg(serverHello) - if !ok { - return - } - - if !p.telemetryActive.CompareAndSwap(false, true) { - return - } - - p.wg.Add(1) - go func() { - defer p.wg.Done() - defer p.telemetryActive.Store(false) - - ticker := time.NewTicker(interval) - defer ticker.Stop() - - p.sendTelemetry(ctx, endpoint, "join") - for { - select { - case <-ticker.C: - p.sendTelemetry(ctx, endpoint, "stats") - case <-p.telemetryCh: - p.sendTelemetry(ctx, endpoint, "leave") - return - case <-p.closeCh: - p.sendTelemetry(ctx, endpoint, "leave") - return - } - } - }() -} - -func parseTelemetryCfg(serverHello map[string]interface{}) (string, time.Duration, bool) { - cfg, ok := serverHello["telemetryConfiguration"].(map[string]interface{}) - if !ok { - return "", 0, false - } - - endpoint, ok := cfg["logEndpoint"].(string) - if !ok || endpoint == "" { - endpoint, ok = cfg["endpoint"].(string) - if !ok || endpoint == "" { - endpoint, _ = cfg["url"].(string) - } - } - - if endpoint == "" { - return "", 0, false - } - - interval := defaultTelemetryInterval - if raw, ok := cfg["sendingInterval"].(float64); ok && raw > 0 { - interval = time.Duration(raw) * time.Millisecond - } - - return endpoint, interval, true -} - -func (p *Peer) stopTelemetry() { - if p.telemetryActive.Load() { - select { - case p.telemetryCh <- struct{}{}: - default: - } - } -} - -func (p *Peer) sendTelemetry(ctx context.Context, endpoint, event string) { - body, err := json.Marshal(map[string]interface{}{ - "event": event, - "timestamp": time.Now().UnixMilli(), - "peerId": p.conn.PeerID, - "roomId": p.conn.RoomID, - "displayName": p.name, - "implementation": "olcrtc-go", - "dataChannel": map[string]interface{}{ - "bufferedAmount": p.GetBufferedAmount(), - "sendQueue": len(p.sendQueue), - }, - }) - if err != nil { - return - } - - req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) - if err != nil { - logger.Verbosef("Telemetry req error: %v", err) - return - } - req.Header.Set("Content-Type", "application/json") - req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0") - req.Header.Set("Origin", "https://telemost.yandex.ru") - req.Header.Set("Referer", p.roomURL) - req.Header.Set("X-Requested-With", "XMLHttpRequest") - req.Header.Set("Client-Instance-Id", uuid.New().String()) - req.Header.Set("X-Telemost-Client-Version", "187.1.0") - req.Header.Set("Idempotency-Key", uuid.New().String()) - - client := protect.NewHTTPClient() - resp, err := client.Do(req) - if err != nil { - logger.Verbosef("Telemetry send error: %v", err) - return - } - defer func() { _ = resp.Body.Close() }() -} - -func (p *Peer) signalEnded(reason string) { - p.closed.Store(true) - p.stopTelemetry() - if p.onEnded != nil { - p.onEnded(reason) - } -} - -func isConferenceEndMessage(msg map[string]interface{}) bool { - for _, key := range []string{"conferenceClosed", "conferenceEnded", "roomClosed", "roomEnded", "callEnded"} { - if _, ok := msg[key]; ok { - return true - } - } - - if raw, ok := msg["conference"].(map[string]interface{}); ok { - if state, _ := raw["state"].(string); isEndedState(state) { - return true - } - } - - if raw, ok := msg["conferenceState"].(map[string]interface{}); ok { - if state, _ := raw["state"].(string); isEndedState(state) { - return true - } - } - - return false -} - -func isEndedState(state string) bool { - switch strings.ToLower(state) { - case "closed", "ended", "finished", stateTerminated: - return true - default: - return false - } -} - -func (p *Peer) setupICEHandlers() { - p.pcSub.OnICECandidate(func(c *webrtc.ICECandidate) { - if c == nil { - return - } - init := c.ToJSON() - p.wsMu.Lock() - _ = p.ws.WriteJSON(map[string]interface{}{ - keyUID: uuid.New().String(), - "webrtcIceCandidate": map[string]interface{}{ - "candidate": init.Candidate, - "sdpMid": init.SDPMid, - "sdpMlineIndex": init.SDPMLineIndex, - "target": "SUBSCRIBER", - keyPcSeq: 1, - }, - }) - p.wsMu.Unlock() - }) - - p.pcPub.OnICECandidate(func(c *webrtc.ICECandidate) { - if c == nil { - return - } - init := c.ToJSON() - p.wsMu.Lock() - _ = p.ws.WriteJSON(map[string]interface{}{ - keyUID: uuid.New().String(), - "webrtcIceCandidate": map[string]interface{}{ - "candidate": init.Candidate, - "sdpMid": init.SDPMid, - "sdpMlineIndex": init.SDPMLineIndex, - "target": "PUBLISHER", - keyPcSeq: 1, - }, - }) - p.wsMu.Unlock() - }) -} - -func (p *Peer) sendLeave(uid string) bool { - p.wsMu.Lock() - defer p.wsMu.Unlock() - - if p.ws == nil { - return false - } - - leave := map[string]interface{}{ - keyUID: uid, - "leave": map[string]interface{}{}, - } - - if err := p.ws.WriteJSON(leave); err != nil { - return false - } - return true -} - -// Close closes the peer connection and cleans up resources. -func (p *Peer) Close() error { - alreadyClosing := p.closed.Swap(true) - p.sendQueueClosed.Store(true) - - if !alreadyClosing { - leaveUID := uuid.New().String() - leaveAck := p.registerAckWaiter(leaveUID) - if p.sendLeave(leaveUID) { - _ = p.waitForAck(leaveUID, leaveAck, 1500*time.Millisecond) - } else { - p.removeAckWaiter(leaveUID) - } - } - - closeSignal(p.closeCh) - p.stopSession() - - if p.dc != nil { - _ = p.dc.Close() - } - if p.pcPub != nil { - _ = p.pcPub.Close() - } - if p.pcSub != nil { - _ = p.pcSub.Close() - } - if p.ws != nil { - p.wsMu.Lock() - _ = p.ws.WriteControl(websocket.CloseMessage, - websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), - time.Now().Add(time.Second)) - _ = p.ws.Close() - p.wsMu.Unlock() - } - - done := make(chan struct{}) - go func() { - p.wg.Wait() - close(done) - }() - - select { - case <-done: - case <-time.After(2 * time.Second): - } - - return nil -} - -func (p *Peer) keepAlive(keepAliveCh <-chan struct{}) { - wsTicker := time.NewTicker(30 * time.Second) - defer wsTicker.Stop() - appTicker := time.NewTicker(5 * time.Second) - defer appTicker.Stop() - - for { - select { - case <-wsTicker.C: - if !p.sendWSPing() { - return - } - case <-appTicker.C: - if !p.sendAppPing() { - return - } - case <-keepAliveCh: - return - case <-p.closeCh: - return - } - } -} - -func (p *Peer) sendWSPing() bool { - p.wsMu.Lock() - defer p.wsMu.Unlock() - if p.ws != nil { - if err := p.ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)); err != nil { - logger.Debugf("ws ping error: %v", err) - p.queueReconnect() - return false - } - } - return true -} - -func (p *Peer) sendAppPing() bool { - p.wsMu.Lock() - defer p.wsMu.Unlock() - if p.ws != nil { - if err := p.ws.WriteJSON(map[string]interface{}{ - keyUID: uuid.New().String(), - "ping": map[string]interface{}{}, - }); err != nil { - logger.Debugf("app ping error: %v", err) - p.queueReconnect() - return false - } - } - return true -} - -func (p *Peer) reconnect(ctx context.Context) error { - p.reconnecting.Store(true) - defer p.reconnecting.Store(false) - - p.sendLeave(uuid.New().String()) - time.Sleep(500 * time.Millisecond) - p.stopSession() - - if p.dc != nil { - _ = p.dc.Close() - } - if p.pcPub != nil { - _ = p.pcPub.Close() - } - if p.pcSub != nil { - _ = p.pcSub.Close() - } - if p.ws != nil { - p.wsMu.Lock() - _ = p.ws.WriteControl(websocket.CloseMessage, - websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), - time.Now().Add(time.Second)) - _ = p.ws.Close() - p.wsMu.Unlock() - } - - if p.onReconnect != nil { - p.onReconnect(nil) - } - - time.Sleep(3 * time.Second) - conn, err := GetConnectionInfo(ctx, p.roomURL, p.name) - if err != nil { - return fmt.Errorf("reconnect get info: %w", err) - } - p.conn = conn - - if err := p.Connect(ctx); err != nil { - return err - } - - if p.onReconnect != nil { - p.onReconnect(p.dc) - } - p.drainReconnectQueue() - return nil -} - -// SetReconnectCallback sets the callback for reconnection events. -func (p *Peer) SetReconnectCallback(cb func(*webrtc.DataChannel)) { - p.onReconnect = cb -} - -// SetShouldReconnect sets the policy for reconnection. -func (p *Peer) SetShouldReconnect(fn func() bool) { - p.shouldReconnect = fn -} - -// WatchConnection monitors the connection lifecycle. -func (p *Peer) WatchConnection(ctx context.Context) { - const maxReconnects = 10 - const reconnectWindow = 5 * time.Minute - - for { - select { - case <-ctx.Done(): - return - case <-p.closeCh: - return - case <-p.reconnectCh: - if p.handleReconnectAttempt(ctx, maxReconnects, reconnectWindow) { - return - } - } - } -} - -func (p *Peer) handleReconnectAttempt(ctx context.Context, maxReconnects int, reconnectWindow time.Duration) bool { - if time.Since(p.lastReconnect) > reconnectWindow { - p.reconnectCount = 0 - } - p.reconnectCount++ - p.lastReconnect = time.Now() - - if p.reconnectCount > maxReconnects { - p.signalEnded("reconnect limit reached") - return true - } - - backoff := time.Duration(p.reconnectCount) * 2 * time.Second - if backoff > 30*time.Second { - backoff = 30 * time.Second - } - - return p.retryReconnect(ctx, backoff) -} - -func (p *Peer) retryReconnect(ctx context.Context, backoff time.Duration) bool { - for { - if err := p.reconnect(ctx); err != nil { - logger.Debugf("reconnect failed: %v", err) - select { - case <-ctx.Done(): - return true - case <-p.closeCh: - return true - case <-time.After(backoff): - continue - } - } - break - } - return false -} - -func (p *Peer) processSendQueue(workerID int, sessionCloseCh <-chan struct{}) { - for { - select { - case <-sessionCloseCh: - return - case <-p.closeCh: - return - case data := <-p.sendQueue: - if len(data) > p.trafficShape.MaxMessageSize { - logger.Debugf("oversized message size=%d limit=%d", len(data), p.trafficShape.MaxMessageSize) - continue - } - - waited, err := p.waitBufferedAmount(workerID, sessionCloseCh) - if err != nil { - return - } - if waited > 0 { - logger.Verbosef("[WORKER-%d] Drained after %v", workerID, waited) - } - - if err := p.dc.Send(data); err != nil { - logger.Debugf("send error: %v", err) - p.queueReconnect() - return - } - - if p.trafficShape.MinDelay > 0 { - time.Sleep(p.calculateDelay()) - } - } - } -} - -func (p *Peer) waitBufferedAmount(workerID int, sessionCloseCh <-chan struct{}) (time.Duration, error) { - start := time.Now() - for p.dc.BufferedAmount() > 512*1024 { - select { - case <-sessionCloseCh: - return 0, ErrSessionClosed - case <-p.closeCh: - return 0, ErrPeerClosed - case <-time.After(10 * time.Millisecond): - if time.Since(start) > 5*time.Second { - logger.Debugf("buffer wait timeout worker=%d", workerID) - return time.Since(start), nil - } - } - } - return time.Since(start), nil -} - -func (p *Peer) calculateDelay() time.Duration { - minDelay := p.trafficShape.MinDelay - maxDelay := p.trafficShape.MaxDelay - if maxDelay <= minDelay { - return minDelay - } - return minDelay + time.Duration(rand.Int64N(int64(maxDelay-minDelay))) //nolint:gosec,lll // G404: non-cryptographic shaping randomness -} - -// CanSend checks if data can be sent. -func (p *Peer) CanSend() bool { - if p.onData == nil { - if p.hasLocalVideoTracks() { - return !p.closed.Load() && p.subscriberReady.Load() && p.publisherReady.Load() - } - return !p.closed.Load() && p.subscriberReady.Load() - } - if p.dc == nil || p.dc.ReadyState() != webrtc.DataChannelStateOpen { - return false - } - return len(p.sendQueue) < 4000 -} - -var ( - // ErrPublisherNotInitialized is returned when the publisher peer connection is not set up. - ErrPublisherNotInitialized = errors.New("publisher peer connection not initialized") -) - -// AddVideoTrack adds a video track to the publisher peer connection. -func (p *Peer) AddVideoTrack(track webrtc.TrackLocal) error { - p.videoTrackMu.Lock() - p.videoTracks = append(p.videoTracks, track) - p.videoTrackMu.Unlock() - - if p.pcPub == nil { - return nil - } - if _, err := p.pcPub.AddTrack(track); err != nil { - return fmt.Errorf("failed to add track: %w", err) - } - return nil -} - -// SetVideoTrackHandler registers a callback for remote video tracks. -func (p *Peer) SetVideoTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) { - p.videoTrackMu.Lock() - defer p.videoTrackMu.Unlock() - p.onVideoTrack = cb -} diff --git a/internal/provider/telemost/peer_helpers_test.go b/internal/provider/telemost/peer_helpers_test.go deleted file mode 100644 index de892e4..0000000 --- a/internal/provider/telemost/peer_helpers_test.go +++ /dev/null @@ -1,196 +0,0 @@ -package telemost - -import ( - "testing" - "time" - - "github.com/pion/webrtc/v4" -) - -func TestCloseSignal(t *testing.T) { - closeSignal(nil) - - ch := make(chan struct{}) - closeSignal(ch) - select { - case <-ch: - default: - t.Fatal("closeSignal() did not close channel") - } - closeSignal(ch) -} - -func TestTrafficShapeAndDelay(t *testing.T) { - p := &Peer{} - p.SetTrafficShape(TrafficShape{MaxMessageSize: -1, MinDelay: 5 * time.Millisecond, MaxDelay: 2 * time.Millisecond}) - if p.trafficShape.MaxMessageSize != realDataChannelMessageLimit { - t.Fatalf("MaxMessageSize = %d, want default", p.trafficShape.MaxMessageSize) - } - if p.trafficShape.MaxDelay != p.trafficShape.MinDelay { - t.Fatalf("MaxDelay = %v, want %v", p.trafficShape.MaxDelay, p.trafficShape.MinDelay) - } - if got := p.calculateDelay(); got != 5*time.Millisecond { - t.Fatalf("calculateDelay() = %v, want 5ms", got) - } - - p.SetTrafficShape(TrafficShape{MaxMessageSize: 10, MinDelay: time.Millisecond, MaxDelay: 4 * time.Millisecond}) - for range 20 { - got := p.calculateDelay() - if got < time.Millisecond || got >= 4*time.Millisecond { - t.Fatalf("calculateDelay() = %v, out of range", got) - } - } -} - -func TestICEParsingFiltersTURN(t *testing.T) { - if isNonTURNURL("") || isNonTURNURL("turn:host") || isNonTURNURL("turns:host") { - t.Fatal("isNonTURNURL accepted empty or TURN URL") - } - if !isNonTURNURL("stun:host") { - t.Fatal("isNonTURNURL rejected STUN URL") - } - - urls := parseICEURLs(map[string]interface{}{"urls": []interface{}{"turn:x", "stun:a", 123, "turns:y"}}) //nolint:goconst,lll // test literal, repetition is intentional - if len(urls) != 1 || urls[0] != "stun:a" { - t.Fatalf("parseICEURLs(interface) = %v, want [stun:a]", urls) - } - - urls = parseICEURLs(map[string]interface{}{"urls": []string{"stun:a", "turn:b"}}) - if len(urls) != 1 || urls[0] != "stun:a" { - t.Fatalf("parseICEURLs(strings) = %v, want [stun:a]", urls) - } -} - -func TestParseICEServer(t *testing.T) { - if _, ok := parseICEServer("bad"); ok { - t.Fatal("parseICEServer() accepted non-map") - } - if _, ok := parseICEServer(map[string]interface{}{"urls": []interface{}{"turn:x"}}); ok { - t.Fatal("parseICEServer() accepted TURN-only server") - } - - ice, ok := parseICEServer(map[string]interface{}{ - "urls": []interface{}{"stun:a", "turn:b"}, - "username": "user", - "credential": "pass", - }) - if !ok { - t.Fatal("parseICEServer() ok = false") - } - if len(ice.URLs) != 1 || ice.URLs[0] != "stun:a" || ice.Username != "user" || ice.Credential != "pass" { - t.Fatalf("parseICEServer() = %+v", ice) - } -} - -func TestConferenceEndParsing(t *testing.T) { - for _, msg := range []map[string]interface{}{ - {"conferenceClosed": true}, - {"conference": map[string]interface{}{"state": "ENDED"}}, //nolint:goconst // test literal, repetition is intentional - {"conferenceState": map[string]interface{}{"state": "terminated"}}, - } { - if !isConferenceEndMessage(msg) { - t.Fatalf("isConferenceEndMessage(%v) = false", msg) - } - } - if isConferenceEndMessage(map[string]interface{}{"conference": map[string]interface{}{"state": "open"}}) { - t.Fatal("isConferenceEndMessage() accepted active conference") - } - - for _, state := range []string{"closed", "ended", "finished", "terminated"} { - if !isEndedState(state) { - t.Fatalf("isEndedState(%q) = false", state) - } - } - if isEndedState("active") { - t.Fatal("isEndedState(active) = true") - } -} - -//nolint:cyclop // table-driven test naturally has many branches -func TestPeerSmallStateHelpers(t *testing.T) { - p := &Peer{ - reconnectCh: make(chan struct{}, 1), - closeCh: make(chan struct{}), - sendQueue: make(chan []byte, 2), - ackWaiters: make(map[string]chan struct{}), - } - p.SetEndedCallback(func(string) {}) - if p.onEnded == nil { - t.Fatal("SetEndedCallback() did not store callback") - } - p.SetReconnectCallback(func(*webrtc.DataChannel) {}) - if p.onReconnect == nil { - t.Fatal("SetReconnectCallback() did not store callback") - } - p.SetShouldReconnect(func() bool { return true }) - if p.shouldReconnect == nil || !p.shouldReconnect() { - t.Fatal("SetShouldReconnect() did not store callback") - } - - p.subscriberReady.Store(true) - if !p.CanSend() { - t.Fatal("CanSend() = false for subscriber-only ready peer") - } - p.closed.Store(true) - if p.CanSend() { - t.Fatal("CanSend() = true for closed peer") - } - - ch := p.registerAckWaiter("uid-1") - p.resolveAck("uid-1") - select { - case <-ch: - default: - t.Fatal("resolveAck() did not close waiter") - } - if p.waitForAck("", make(chan struct{}), time.Millisecond) { - t.Fatal("waitForAck(empty uid) = true") - } - - ch = p.registerAckWaiter("uid-2") - go p.resolveAck("uid-2") - if !p.waitForAck("uid-2", ch, time.Second) { - t.Fatal("waitForAck() = false after resolveAck") - } - - if err := p.AddVideoTrack(nil); err != nil { - t.Fatalf("AddVideoTrack(nil) error = %v", err) - } - if !p.hasLocalVideoTracks() { - t.Fatal("hasLocalVideoTracks() = false after AddVideoTrack") - } - p.SetVideoTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver) {}) - if p.videoTrackHandler() == nil { - t.Fatal("videoTrackHandler() = nil") - } -} - -func TestTelemetryCfgParsing(t *testing.T) { - if _, _, ok := parseTelemetryCfg(map[string]interface{}{}); ok { - t.Fatal("parseTelemetryCfg() accepted missing config") - } - if _, _, ok := parseTelemetryCfg(map[string]interface{}{ - "telemetryConfiguration": map[string]interface{}{}, //nolint:goconst // test literal, repetition is intentional - }); ok { - t.Fatal("parseTelemetryCfg() accepted missing endpoint") - } - - endpoint, interval, ok := parseTelemetryCfg(map[string]interface{}{ - "telemetryConfiguration": map[string]interface{}{ - "endpoint": "https://example.test/log", - "sendingInterval": float64(250), - }, - }) - if !ok || endpoint != "https://example.test/log" || interval != 250*time.Millisecond { - t.Fatalf("parseTelemetryCfg() = (%q, %v, %v)", endpoint, interval, ok) - } - - endpoint, interval, ok = parseTelemetryCfg(map[string]interface{}{ - "telemetryConfiguration": map[string]interface{}{ - "url": "https://example.test/url", - }, - }) - if !ok || endpoint != "https://example.test/url" || interval != defaultTelemetryInterval { - t.Fatalf("parseTelemetryCfg(default) = (%q, %v, %v)", endpoint, interval, ok) - } -} diff --git a/internal/provider/telemost/provider.go b/internal/provider/telemost/provider.go deleted file mode 100644 index c9ee6f2..0000000 --- a/internal/provider/telemost/provider.go +++ /dev/null @@ -1,84 +0,0 @@ -// Package telemost implements the Yandex Telemost WebRTC provider. -package telemost - -import ( - "context" - "fmt" - - "github.com/openlibrecommunity/olcrtc/internal/provider" - "github.com/pion/webrtc/v4" -) - -type telemostProvider struct { - peer *Peer -} - -// New creates a new Telemost provider instance. -func New(ctx context.Context, cfg provider.Config) (provider.Provider, error) { - peer, err := NewPeer(ctx, cfg.RoomURL, cfg.Name, cfg.OnData) - if err != nil { - return nil, fmt.Errorf("create telemost peer: %w", err) - } - - return &telemostProvider{peer: peer}, nil -} - -// Connect starts the provider connection. -func (t *telemostProvider) Connect(ctx context.Context) error { - return t.peer.Connect(ctx) -} - -// Send transmits data to the room. -func (t *telemostProvider) Send(data []byte) error { - return t.peer.Send(data) -} - -// Close terminates the provider connection. -func (t *telemostProvider) Close() error { - return t.peer.Close() -} - -// SetReconnectCallback sets the function to call on reconnection. -func (t *telemostProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) { - t.peer.SetReconnectCallback(cb) -} - -// SetShouldReconnect sets the function to determine if reconnection should occur. -func (t *telemostProvider) SetShouldReconnect(fn func() bool) { - t.peer.SetShouldReconnect(fn) -} - -// SetEndedCallback sets the function to call when the session ends. -func (t *telemostProvider) SetEndedCallback(cb func(string)) { - t.peer.SetEndedCallback(cb) -} - -// WatchConnection monitors the provider connection state. -func (t *telemostProvider) WatchConnection(ctx context.Context) { - t.peer.WatchConnection(ctx) -} - -// CanSend checks if the provider is ready to transmit data. -func (t *telemostProvider) CanSend() bool { - return t.peer.CanSend() -} - -// GetSendQueue returns the data transmission queue. -func (t *telemostProvider) GetSendQueue() chan []byte { - return t.peer.GetSendQueue() -} - -// GetBufferedAmount returns the current WebRTC buffered amount. -func (t *telemostProvider) GetBufferedAmount() uint64 { - return t.peer.GetBufferedAmount() -} - -// AddVideoTrack adds a video track to the telemost connection. -func (t *telemostProvider) AddVideoTrack(track webrtc.TrackLocal) error { - return t.peer.AddVideoTrack(track) -} - -// SetVideoTrackHandler registers a callback for subscribed remote video tracks. -func (t *telemostProvider) SetVideoTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) { - t.peer.SetVideoTrackHandler(cb) -} diff --git a/internal/provider/telemost/provider_test.go b/internal/provider/telemost/provider_test.go deleted file mode 100644 index e29d008..0000000 --- a/internal/provider/telemost/provider_test.go +++ /dev/null @@ -1,55 +0,0 @@ -package telemost - -import ( - "context" - "errors" - "testing" - - "github.com/pion/webrtc/v4" -) - -//nolint:cyclop // table-driven test naturally has many branches -func TestTelemostProviderForwardsPeerMethods(t *testing.T) { - peer := &Peer{ - reconnectCh: make(chan struct{}, 1), - closeCh: make(chan struct{}), - sendQueue: make(chan []byte, 1), - ackWaiters: make(map[string]chan struct{}), - } - p := &telemostProvider{peer: peer} - - p.SetReconnectCallback(func(*webrtc.DataChannel) {}) - p.SetShouldReconnect(func() bool { return true }) - p.SetEndedCallback(func(string) {}) - p.SetVideoTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver) {}) - if peer.onReconnect == nil || peer.shouldReconnect == nil || peer.onEnded == nil || peer.onVideoTrack == nil { - t.Fatal("callbacks were not forwarded") - } - - if p.GetSendQueue() != peer.sendQueue { - t.Fatal("GetSendQueue() did not forward") - } - if p.GetBufferedAmount() != 0 { - t.Fatal("GetBufferedAmount() != 0 with nil datachannel") - } - if err := p.AddVideoTrack(nil); err != nil { - t.Fatalf("AddVideoTrack(nil) error = %v", err) - } - if p.CanSend() { - t.Fatal("CanSend() = true for unready peer") - } - - done := make(chan struct{}) - go func() { - p.WatchConnection(context.Background()) - close(done) - }() - if err := p.Close(); err != nil { - t.Fatalf("Close() error = %v", err) - } - <-done - - if err := p.Send([]byte("x")); !errors.Is(err, ErrDataChannelNotReady) { - t.Fatalf("Send() error = %v, want datachannel not ready", err) - } -} diff --git a/internal/provider/telemost/state_helpers_test.go b/internal/provider/telemost/state_helpers_test.go deleted file mode 100644 index 08f9362..0000000 --- a/internal/provider/telemost/state_helpers_test.go +++ /dev/null @@ -1,85 +0,0 @@ -package telemost - -import ( - "testing" - "time" -) - -//nolint:cyclop // table-driven test naturally has many branches -func TestSessionReconnectAndEndedHelpers(t *testing.T) { - p := &Peer{ - reconnectCh: make(chan struct{}, 2), - closeCh: make(chan struct{}), - keepAliveCh: make(chan struct{}), - sessionCloseCh: make(chan struct{}), - telemetryCh: make(chan struct{}, 1), - } - - keepAliveCh, sessionCloseCh := p.resetSession() - if keepAliveCh == nil || sessionCloseCh == nil || keepAliveCh != p.keepAliveCh || sessionCloseCh != p.sessionCloseCh { - t.Fatal("resetSession() did not replace session channels") - } - - p.subscriberReady.Store(true) - p.publisherReady.Store(true) - p.resetMediaState() - if p.subscriberReady.Load() || p.publisherReady.Load() || p.subscriberConn == nil || p.publisherConn == nil { - t.Fatal("resetMediaState() did not reset readiness") - } - - p.queueReconnect() - select { - case <-p.reconnectCh: - default: - t.Fatal("queueReconnect() did not enqueue") - } - - p.SetShouldReconnect(func() bool { return false }) - p.queueReconnect() - select { - case <-p.reconnectCh: - t.Fatal("queueReconnect() enqueued despite policy=false") - default: - } - - p.reconnectCh <- struct{}{} - p.reconnectCh <- struct{}{} - p.drainReconnectQueue() - select { - case <-p.reconnectCh: - t.Fatal("drainReconnectQueue() left queued item") - default: - } - - p.telemetryActive.Store(true) - p.stopTelemetry() - select { - case <-p.telemetryCh: - default: - t.Fatal("stopTelemetry() did not signal active telemetry") - } - - ended := "" - p.SetEndedCallback(func(reason string) { ended = reason }) - p.signalEnded("done") - if !p.closed.Load() || ended != "done" { - t.Fatalf("signalEnded() closed=%v reason=%q", p.closed.Load(), ended) - } -} - -func TestWaitForAckTimeoutAndClose(t *testing.T) { - p := &Peer{ - closeCh: make(chan struct{}), - ackWaiters: make(map[string]chan struct{}), - } - ch := p.registerAckWaiter("timeout") - if p.waitForAck("timeout", ch, time.Millisecond) { - t.Fatal("waitForAck(timeout) = true") - } - - ch = p.registerAckWaiter("closed") - close(p.closeCh) - if p.waitForAck("closed", ch, time.Second) { - t.Fatal("waitForAck(closeCh) = true") - } -}