refactor: migrate telemost to engine/goolom + auth/telemost

Decompose the monolithic internal/provider/telemost package into two
orthogonal layers: engine/goolom (Yandex proprietary SFU wire protocol —
WebSocket signaling, dual pub/sub PeerConnections, DataChannel, telemetry)
and auth/telemost (HTTP connection-info fetch → engine.Credentials).

Add engine.Config.Refresh callback so Goolom can obtain fresh peerID and
credentials on every reconnect without a direct dependency on the auth
package. engine_adapter wires the Refresh closure from authProvider.Issue.

Delete internal/provider/ entirely (telemost was the last tenant) and
remove the now-obsolete provider_adapter + its test from builtin.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
zarazaex69
2026-05-11 13:13:21 +03:00
parent d48eb565f5
commit d65784ff8c
19 changed files with 1687 additions and 2424 deletions

View File

@@ -1,3 +1,9 @@
// Package telemost is the auth provider for the Yandex Telemost service.
// It fetches the connection metadata (media server URL, peer ID, room ID,
// signing credentials) the Goolom engine needs to join a conference.
//
// Telemost does not expose an API to create rooms — they originate in the
// Yandex UI — so this provider does not implement auth.RoomCreator.
package telemost
import (
@@ -71,6 +77,5 @@ func GetConnectionInfo(ctx context.Context, roomURL, displayName string) (*Conne
if err := json.NewDecoder(resp.Body).Decode(&info); err != nil {
return nil, fmt.Errorf("failed to decode response: %w", err)
}
return &info, nil
}

View File

@@ -0,0 +1,43 @@
package telemost
import (
"context"
"fmt"
"github.com/openlibrecommunity/olcrtc/internal/auth"
)
// Provider produces Goolom credentials for the Yandex Telemost service.
type Provider struct{}
// Engine reports which engine consumes credentials from this auth provider.
func (Provider) Engine() string { return "goolom" }
// Issue fetches connection info for a Telemost room and returns engine credentials.
//
// cfg.RoomURL must be a Telemost conference URL (e.g.
// https://telemost.yandex.ru/j/<id>). Room creation is not supported by the
// Telemost API; rooms originate in the Yandex UI.
func (Provider) Issue(ctx context.Context, cfg auth.Config) (auth.Credentials, error) {
if cfg.RoomURL == "" {
return auth.Credentials{}, auth.ErrRoomIDRequired
}
info, err := GetConnectionInfo(ctx, cfg.RoomURL, cfg.Name)
if err != nil {
return auth.Credentials{}, fmt.Errorf("get connection info: %w", err)
}
return auth.Credentials{
URL: info.ClientConfig.MediaServerURL,
Token: info.PeerID,
Extra: map[string]string{
"roomID": info.RoomID,
"credentials": info.Credentials,
"roomURL": cfg.RoomURL,
"telemetryReferer": cfg.RoomURL,
},
}, nil
}
func init() { //nolint:gochecknoinits // auth registration is the canonical Go pattern for plugins
auth.Register("telemost", Provider{})
}

View File

@@ -15,13 +15,14 @@ import (
// reports.
func registerEngineAuth(carrierName string, authProvider auth.Provider) {
carrier.Register(carrierName, func(ctx context.Context, cfg carrier.Config) (carrier.Session, error) {
creds, err := authProvider.Issue(ctx, auth.Config{
authCfg := auth.Config{
RoomURL: cfg.RoomURL,
Name: cfg.Name,
DNSServer: cfg.DNSServer,
ProxyAddr: cfg.ProxyAddr,
ProxyPort: cfg.ProxyPort,
})
}
creds, err := authProvider.Issue(ctx, authCfg)
if err != nil {
return nil, fmt.Errorf("auth issue: %w", err)
}
@@ -35,6 +36,13 @@ func registerEngineAuth(carrierName string, authProvider auth.Provider) {
DNSServer: cfg.DNSServer,
ProxyAddr: cfg.ProxyAddr,
ProxyPort: cfg.ProxyPort,
Refresh: func(ctx context.Context) (engine.Credentials, error) {
fresh, err := authProvider.Issue(ctx, authCfg)
if err != nil {
return engine.Credentials{}, fmt.Errorf("auth refresh: %w", err)
}
return engine.Credentials{URL: fresh.URL, Token: fresh.Token, Extra: fresh.Extra}, nil
},
})
if err != nil {
return nil, fmt.Errorf("engine new: %w", err)

View File

@@ -1,121 +0,0 @@
package builtin
import (
"context"
"fmt"
"github.com/openlibrecommunity/olcrtc/internal/carrier"
"github.com/openlibrecommunity/olcrtc/internal/provider"
"github.com/pion/webrtc/v4"
)
type providerSession struct {
provider provider.Provider
}
func (s *providerSession) Capabilities() carrier.Capabilities {
caps := carrier.Capabilities{ByteStream: true}
_, caps.VideoTrack = s.provider.(videoTrackProvider)
return caps
}
func (s *providerSession) OpenByteStream() (carrier.ByteStream, error) {
return &providerByteStream{provider: s.provider}, nil
}
func (s *providerSession) OpenVideoTrack() (carrier.VideoTrack, error) {
vtp, ok := s.provider.(videoTrackProvider)
if !ok {
return nil, carrier.ErrVideoTrackUnsupported
}
return &providerVideoTrack{provider: vtp}, nil
}
type videoTrackProvider interface {
provider.Provider
provider.VideoTrackCapable
}
type providerByteStream struct {
provider provider.Provider
}
func (p *providerByteStream) Connect(ctx context.Context) error {
if err := p.provider.Connect(ctx); err != nil {
return fmt.Errorf("connect: %w", err)
}
return nil
}
func (p *providerByteStream) Send(data []byte) error {
if err := p.provider.Send(data); err != nil {
return fmt.Errorf("send: %w", err)
}
return nil
}
func (p *providerByteStream) Close() error {
if err := p.provider.Close(); err != nil {
return fmt.Errorf("close: %w", err)
}
return nil
}
func (p *providerByteStream) SetReconnectCallback(cb func()) {
p.provider.SetReconnectCallback(func(_ *webrtc.DataChannel) {
if cb != nil {
cb()
}
})
}
func (p *providerByteStream) SetShouldReconnect(fn func() bool) { p.provider.SetShouldReconnect(fn) }
func (p *providerByteStream) SetEndedCallback(cb func(string)) { p.provider.SetEndedCallback(cb) }
func (p *providerByteStream) WatchConnection(ctx context.Context) {
p.provider.WatchConnection(ctx)
}
func (p *providerByteStream) CanSend() bool { return p.provider.CanSend() }
type providerVideoTrack struct {
provider videoTrackProvider
}
func (v *providerVideoTrack) Connect(ctx context.Context) error {
if err := v.provider.Connect(ctx); err != nil {
return fmt.Errorf("connect: %w", err)
}
return nil
}
func (v *providerVideoTrack) Close() error {
if err := v.provider.Close(); err != nil {
return fmt.Errorf("close: %w", err)
}
return nil
}
func (v *providerVideoTrack) SetReconnectCallback(cb func()) {
v.provider.SetReconnectCallback(func(_ *webrtc.DataChannel) {
if cb != nil {
cb()
}
})
}
func (v *providerVideoTrack) SetShouldReconnect(fn func() bool) { v.provider.SetShouldReconnect(fn) }
func (v *providerVideoTrack) SetEndedCallback(cb func(string)) { v.provider.SetEndedCallback(cb) }
func (v *providerVideoTrack) WatchConnection(ctx context.Context) {
v.provider.WatchConnection(ctx)
}
func (v *providerVideoTrack) CanSend() bool { return v.provider.CanSend() }
func (v *providerVideoTrack) AddTrack(track webrtc.TrackLocal) error {
if err := v.provider.AddVideoTrack(track); err != nil {
return fmt.Errorf("add track: %w", err)
}
return nil
}
func (v *providerVideoTrack) SetTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) {
v.provider.SetVideoTrackHandler(cb)
}

View File

@@ -1,205 +0,0 @@
package builtin
import (
"context"
"errors"
"testing"
"github.com/openlibrecommunity/olcrtc/internal/carrier"
"github.com/pion/webrtc/v4"
)
var (
errConnectBoom = errors.New("connect boom")
errSendBoom = errors.New("send boom")
errCloseBoom = errors.New("close boom")
errTrackBoom = errors.New("track boom")
)
type stubProvider struct {
connectErr error
sendErr error
closeErr error
canSend bool
reconnectCallback func(*webrtc.DataChannel)
shouldReconnect func() bool
endedCallback func(string)
watchCalled bool
addTrackErr error
trackHandlerCalled bool
}
func (s *stubProvider) Connect(context.Context) error { return s.connectErr }
func (s *stubProvider) Send([]byte) error { return s.sendErr }
func (s *stubProvider) Close() error { return s.closeErr }
func (s *stubProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) { s.reconnectCallback = cb }
func (s *stubProvider) SetShouldReconnect(fn func() bool) { s.shouldReconnect = fn }
func (s *stubProvider) SetEndedCallback(cb func(string)) { s.endedCallback = cb }
func (s *stubProvider) WatchConnection(context.Context) { s.watchCalled = true }
func (s *stubProvider) CanSend() bool { return s.canSend }
func (s *stubProvider) GetSendQueue() chan []byte { return nil }
func (s *stubProvider) GetBufferedAmount() uint64 { return 0 }
func (s *stubProvider) AddVideoTrack(webrtc.TrackLocal) error { return s.addTrackErr }
func (s *stubProvider) SetVideoTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) {
s.trackHandlerCalled = true
}
type plainProvider struct {
connectErr error
sendErr error
closeErr error
canSend bool
reconnectCallback func(*webrtc.DataChannel)
shouldReconnect func() bool
endedCallback func(string)
watchCalled bool
}
func (p *plainProvider) Connect(context.Context) error { return p.connectErr }
func (p *plainProvider) Send([]byte) error { return p.sendErr }
func (p *plainProvider) Close() error { return p.closeErr }
func (p *plainProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) { p.reconnectCallback = cb }
func (p *plainProvider) SetShouldReconnect(fn func() bool) { p.shouldReconnect = fn }
func (p *plainProvider) SetEndedCallback(cb func(string)) { p.endedCallback = cb }
func (p *plainProvider) WatchConnection(context.Context) { p.watchCalled = true }
func (p *plainProvider) CanSend() bool { return p.canSend }
func (p *plainProvider) GetSendQueue() chan []byte { return nil }
func (p *plainProvider) GetBufferedAmount() uint64 { return 0 }
func TestProviderSessionOpenVideoTrackUnsupported(t *testing.T) {
sess := &providerSession{provider: &plainProvider{}}
caps := sess.Capabilities()
if !caps.ByteStream || caps.VideoTrack {
t.Fatalf("Capabilities() = %+v, want byte true and video false", caps)
}
_, err := sess.OpenVideoTrack()
if !errors.Is(err, carrier.ErrVideoTrackUnsupported) {
t.Fatalf("OpenVideoTrack() error = %v, want %v", err, carrier.ErrVideoTrackUnsupported)
}
}
func TestProviderByteStreamWrapsProviderAndCallbacks(t *testing.T) {
prov := &stubProvider{canSend: true}
stream := &providerByteStream{provider: prov}
called := false
stream.SetReconnectCallback(func() { called = true })
if prov.reconnectCallback == nil {
t.Fatal("SetReconnectCallback() did not install provider callback")
}
prov.reconnectCallback(nil)
if !called {
t.Fatal("reconnect callback was not adapted")
}
reconnectAllowed := false
stream.SetShouldReconnect(func() bool { reconnectAllowed = true; return true })
if prov.shouldReconnect == nil || !prov.shouldReconnect() || !reconnectAllowed {
t.Fatal("SetShouldReconnect() was not forwarded")
}
ended := ""
stream.SetEndedCallback(func(reason string) { ended = reason })
if prov.endedCallback == nil {
t.Fatal("SetEndedCallback() was not forwarded")
}
prov.endedCallback("bye")
if ended != "bye" {
t.Fatalf("ended callback reason = %q, want bye", ended)
}
stream.WatchConnection(context.Background())
if !prov.watchCalled {
t.Fatal("WatchConnection() was not forwarded")
}
if !stream.CanSend() {
t.Fatal("CanSend() = false, want true")
}
}
func TestProviderByteStreamWrapsErrors(t *testing.T) {
prov := &stubProvider{
connectErr: errConnectBoom,
sendErr: errSendBoom,
closeErr: errCloseBoom,
}
stream := &providerByteStream{provider: prov}
if err := stream.Connect(context.Background()); err == nil || err.Error() != "connect: connect boom" {
t.Fatalf("Connect() error = %v", err)
}
if err := stream.Send([]byte("x")); err == nil || err.Error() != "send: send boom" {
t.Fatalf("Send() error = %v", err)
}
if err := stream.Close(); err == nil || err.Error() != "close: close boom" {
t.Fatalf("Close() error = %v", err)
}
}
func TestProviderSessionOpenByteStreamAndVideoTrack(t *testing.T) {
prov := &stubProvider{canSend: true}
sess := &providerSession{provider: prov}
stream, err := sess.OpenByteStream()
if err != nil {
t.Fatalf("OpenByteStream() error = %v", err)
}
if !stream.CanSend() {
t.Fatal("byte stream CanSend() = false, want true")
}
video, err := sess.OpenVideoTrack()
if err != nil {
t.Fatalf("OpenVideoTrack() error = %v", err)
}
if err := video.Connect(context.Background()); err != nil {
t.Fatalf("video Connect() error = %v", err)
}
if err := video.Close(); err != nil {
t.Fatalf("video Close() error = %v", err)
}
video.SetShouldReconnect(func() bool { return true })
video.SetEndedCallback(func(string) {})
video.WatchConnection(context.Background())
if !video.CanSend() || prov.shouldReconnect == nil || prov.endedCallback == nil || !prov.watchCalled {
t.Fatal("video adapter did not forward calls")
}
}
func TestProviderVideoTrackWrapsOperations(t *testing.T) {
prov := &stubProvider{canSend: true, addTrackErr: errTrackBoom}
track := &providerVideoTrack{provider: prov}
called := false
track.SetReconnectCallback(func() { called = true })
prov.reconnectCallback(nil)
if !called {
t.Fatal("reconnect callback was not adapted")
}
track.SetTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver) {})
if !prov.trackHandlerCalled {
t.Fatal("SetTrackHandler() was not forwarded")
}
if err := track.AddTrack(nil); err == nil || err.Error() != "add track: track boom" {
t.Fatalf("AddTrack() error = %v", err)
}
}
func TestProviderVideoTrackWrapsConnectCloseErrors(t *testing.T) {
prov := &stubProvider{
connectErr: errConnectBoom,
closeErr: errCloseBoom,
}
track := &providerVideoTrack{provider: prov}
if err := track.Connect(context.Background()); err == nil || err.Error() != "connect: connect boom" {
t.Fatalf("Connect() error = %v", err)
}
if err := track.Close(); err == nil || err.Error() != "close: close boom" {
t.Fatalf("Close() error = %v", err)
}
}

View File

@@ -2,42 +2,17 @@
package builtin
import (
"context"
authSaluteJazz "github.com/openlibrecommunity/olcrtc/internal/auth/salutejazz"
authTelemost "github.com/openlibrecommunity/olcrtc/internal/auth/telemost"
authWBStream "github.com/openlibrecommunity/olcrtc/internal/auth/wbstream"
"github.com/openlibrecommunity/olcrtc/internal/carrier"
_ "github.com/openlibrecommunity/olcrtc/internal/engine/goolom" // engine registration via init
_ "github.com/openlibrecommunity/olcrtc/internal/engine/livekit" // engine registration via init
_ "github.com/openlibrecommunity/olcrtc/internal/engine/salutejazz" // engine registration via init
"github.com/openlibrecommunity/olcrtc/internal/provider"
"github.com/openlibrecommunity/olcrtc/internal/provider/telemost"
)
type providerFactory func(context.Context, provider.Config) (provider.Provider, error)
// Register wires the built-in carriers into the carrier registry.
func Register() {
// Legacy provider-based carriers (still being migrated to engine+auth).
registerProvider("telemost", telemost.New)
// Migrated to engine+auth.
registerEngineAuth("wbstream", authWBStream.Provider{})
registerEngineAuth("jazz", authSaluteJazz.Provider{})
}
func registerProvider(name string, factory providerFactory) {
carrier.Register(name, func(ctx context.Context, cfg carrier.Config) (carrier.Session, error) {
prov, err := factory(ctx, provider.Config{
RoomURL: cfg.RoomURL,
Name: cfg.Name,
OnData: cfg.OnData,
DNSServer: cfg.DNSServer,
ProxyAddr: cfg.ProxyAddr,
ProxyPort: cfg.ProxyPort,
})
if err != nil {
return nil, err
}
return &providerSession{provider: prov}, nil
})
registerEngineAuth("telemost", authTelemost.Provider{})
}

View File

@@ -30,10 +30,23 @@ type Capabilities struct {
VideoTrack bool
}
// Credentials are produced by an auth provider — duplicated here to avoid an
// import cycle between engine and auth.
type Credentials struct {
URL string
Token string
Extra map[string]string
}
// Config is the runtime input to an engine factory. URL/Token are produced by
// an auth provider (or supplied directly by the caller for "none" auth).
// Extra carries engine-specific fields that don't fit the common shape
// (e.g. SaluteJazz needs a separate room password alongside the room ID).
//
// Refresh, when set, is called by an engine whose protocol requires fresh
// credentials on each reconnect (e.g. Goolom: every reconnect needs a new
// peerID/credentials tuple from the room-info HTTP endpoint). Engines that
// don't need this should ignore it.
type Config struct {
URL string
Token string
@@ -43,6 +56,7 @@ type Config struct {
DNSServer string
ProxyAddr string
ProxyPort int
Refresh func(ctx context.Context) (Credentials, error)
}
// Session is the engine-level runtime handle. It is shaped to match what

View File

@@ -0,0 +1,422 @@
package goolom
import (
"context"
"fmt"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/openlibrecommunity/olcrtc/internal/engine"
"github.com/openlibrecommunity/olcrtc/internal/logger"
"github.com/openlibrecommunity/olcrtc/internal/protect"
"github.com/pion/webrtc/v4"
)
// Connect starts the WebRTC connection process.
func (s *Session) Connect(ctx context.Context) error {
s.closed.Store(false)
s.resetMediaState()
config := webrtc.Configuration{
ICEServers: []webrtc.ICEServer{{URLs: []string{"stun:stun.rtc.yandex.net:3478"}}},
SDPSemantics: webrtc.SDPSemanticsUnifiedPlan,
}
if err := s.setupPeerConnections(config); err != nil {
return err
}
keepAliveCh, sessionCloseCh := s.resetSession()
var dcReady chan struct{}
if s.onData != nil {
var err error
s.dc, err = s.pcPub.CreateDataChannel("olcrtc", nil)
if err != nil {
return fmt.Errorf("create dc: %w", err)
}
dcReady = make(chan struct{})
s.setupDataChannelHandlers(dcReady, sessionCloseCh)
}
if err := s.dialWebSocket(); err != nil {
return err
}
s.setupICEHandlers()
s.startBackgroundGoroutines(ctx, keepAliveCh)
if s.onData != nil {
select {
case <-dcReady:
return nil
case <-time.After(15 * time.Second):
return ErrDataChannelTimeout
case <-ctx.Done():
return fmt.Errorf("connect context cancelled: %w", ctx.Err())
}
}
return s.waitForMediaReady(ctx, 20*time.Second)
}
func (s *Session) waitForMediaReady(ctx context.Context, timeout time.Duration) error {
timer := time.NewTimer(timeout)
defer timer.Stop()
select {
case <-s.subscriberConn:
case <-timer.C:
return ErrSubscriberMediaTimeout
case <-ctx.Done():
return fmt.Errorf("connect context cancelled: %w", ctx.Err())
}
return nil
}
func (s *Session) setupPeerConnections(config webrtc.Configuration) error {
settingEngine := webrtc.SettingEngine{}
if protect.Protector != nil {
settingEngine.SetICEProxyDialer(protect.NewProxyDialer())
}
api := webrtc.NewAPI(webrtc.WithSettingEngine(settingEngine))
var err error
s.pcSub, err = api.NewPeerConnection(config)
if err != nil {
return fmt.Errorf("new sub pc: %w", err)
}
s.pcSub.OnConnectionStateChange(s.onSubscriberConnectionStateChange)
s.pcSub.OnTrack(func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) {
if track.Kind() != webrtc.RTPCodecTypeVideo {
return
}
logger.Infof("goolom remote video track: codec=%s stream=%s track=%s",
track.Codec().MimeType, track.StreamID(), track.ID())
if cb := s.videoTrackHandler(); cb != nil {
cb(track, receiver)
}
})
s.pcPub, err = api.NewPeerConnection(config)
if err != nil {
return fmt.Errorf("new pub pc: %w", err)
}
s.pcPub.OnConnectionStateChange(s.onPublisherConnectionStateChange)
if err := s.attachPendingVideoTracks(); err != nil {
return err
}
return nil
}
func (s *Session) dialWebSocket() error {
wsDialer := websocket.Dialer{
NetDialContext: protect.DialContext,
HandshakeTimeout: wsHandshakeTimeout,
}
ws, resp, err := wsDialer.Dial(s.mediaServerURL, nil)
if err != nil {
return fmt.Errorf("dial ws: %w", err)
}
if resp != nil && resp.Body != nil {
_ = resp.Body.Close()
}
s.ws = ws
ws.SetPongHandler(func(string) error {
_ = ws.SetReadDeadline(time.Now().Add(wsReadTimeout))
return nil
})
_ = ws.SetReadDeadline(time.Now().Add(wsReadTimeout))
return nil
}
func (s *Session) startBackgroundGoroutines(ctx context.Context, keepAliveCh chan struct{}) {
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.keepAlive(keepAliveCh)
}()
_ = s.sendHello()
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.handleSignaling(ctx)
}()
}
func (s *Session) onConnectionStateChange(state webrtc.PeerConnectionState) {
if !s.closed.Load() && state == webrtc.PeerConnectionStateFailed {
s.queueReconnect()
}
}
func (s *Session) onSubscriberConnectionStateChange(state webrtc.PeerConnectionState) {
logger.Debugf("goolom subscriber state: %s", state.String())
switch state {
case webrtc.PeerConnectionStateConnected:
s.subscriberReady.Store(true)
closeSignal(s.subscriberConn)
case webrtc.PeerConnectionStateDisconnected,
webrtc.PeerConnectionStateFailed,
webrtc.PeerConnectionStateClosed:
s.subscriberReady.Store(false)
case webrtc.PeerConnectionStateUnknown,
webrtc.PeerConnectionStateNew,
webrtc.PeerConnectionStateConnecting:
}
s.onConnectionStateChange(state)
}
func (s *Session) onPublisherConnectionStateChange(state webrtc.PeerConnectionState) {
logger.Debugf("goolom publisher state: %s", state.String())
switch state {
case webrtc.PeerConnectionStateConnected:
s.publisherReady.Store(true)
closeSignal(s.publisherConn)
case webrtc.PeerConnectionStateDisconnected,
webrtc.PeerConnectionStateFailed,
webrtc.PeerConnectionStateClosed:
s.publisherReady.Store(false)
case webrtc.PeerConnectionStateUnknown,
webrtc.PeerConnectionStateNew,
webrtc.PeerConnectionStateConnecting:
}
s.onConnectionStateChange(state)
}
// Close terminates the session and releases resources.
func (s *Session) Close() error {
alreadyClosing := s.closed.Swap(true)
s.sendQueueClosed.Store(true)
if !alreadyClosing {
leaveUID := uuid.New().String()
leaveAck := s.registerAckWaiter(leaveUID)
if s.sendLeave(leaveUID) {
_ = s.waitForAck(leaveUID, leaveAck, 1500*time.Millisecond)
} else {
s.removeAckWaiter(leaveUID)
}
}
closeSignal(s.closeCh)
s.stopSession()
if s.dc != nil {
_ = s.dc.Close()
}
if s.pcPub != nil {
_ = s.pcPub.Close()
}
if s.pcSub != nil {
_ = s.pcSub.Close()
}
if s.ws != nil {
s.wsMu.Lock()
_ = s.ws.WriteControl(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""),
time.Now().Add(time.Second))
_ = s.ws.Close()
s.wsMu.Unlock()
}
done := make(chan struct{})
go func() {
s.wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(2 * time.Second):
}
return nil
}
// WatchConnection monitors the connection lifecycle and reconnects as needed.
func (s *Session) WatchConnection(ctx context.Context) {
const maxReconnects = 10
const reconnectWindow = 5 * time.Minute
for {
select {
case <-ctx.Done():
return
case <-s.closeCh:
return
case <-s.reconnectCh:
if s.handleReconnectAttempt(ctx, maxReconnects, reconnectWindow) {
return
}
}
}
}
func (s *Session) handleReconnectAttempt(ctx context.Context, maxReconnects int, reconnectWindow time.Duration) bool {
if time.Since(s.lastReconnect) > reconnectWindow {
s.reconnectCount = 0
}
s.reconnectCount++
s.lastReconnect = time.Now()
if s.reconnectCount > maxReconnects {
s.signalEnded("reconnect limit reached")
return true
}
backoff := time.Duration(s.reconnectCount) * 2 * time.Second
if backoff > 30*time.Second {
backoff = 30 * time.Second
}
return s.retryReconnect(ctx, backoff)
}
func (s *Session) retryReconnect(ctx context.Context, backoff time.Duration) bool {
for {
if err := s.reconnect(ctx); err != nil {
logger.Debugf("reconnect failed: %v", err)
select {
case <-ctx.Done():
return true
case <-s.closeCh:
return true
case <-time.After(backoff):
continue
}
}
break
}
return false
}
func (s *Session) reconnect(ctx context.Context) error {
s.reconnecting.Store(true)
defer s.reconnecting.Store(false)
s.sendLeave(uuid.New().String())
time.Sleep(500 * time.Millisecond)
s.stopSession()
if s.dc != nil {
_ = s.dc.Close()
}
if s.pcPub != nil {
_ = s.pcPub.Close()
}
if s.pcSub != nil {
_ = s.pcSub.Close()
}
if s.ws != nil {
s.wsMu.Lock()
_ = s.ws.WriteControl(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""),
time.Now().Add(time.Second))
_ = s.ws.Close()
s.wsMu.Unlock()
}
if s.onReconnect != nil {
s.onReconnect(nil)
}
time.Sleep(3 * time.Second)
if s.refresh == nil {
return ErrNoRefresh
}
creds, err := s.refresh(ctx)
if err != nil {
return fmt.Errorf("reconnect refresh: %w", err)
}
s.applyRefreshedCredentials(creds)
if err := s.Connect(ctx); err != nil {
return err
}
if s.onReconnect != nil {
s.onReconnect(s.dc)
}
s.drainReconnectQueue()
return nil
}
func (s *Session) applyRefreshedCredentials(creds engine.Credentials) {
if creds.URL != "" {
s.mediaServerURL = creds.URL
}
if creds.Token != "" {
s.peerID = creds.Token
}
if creds.Extra == nil {
return
}
if v := creds.Extra[credentialKeyRoomID]; v != "" {
s.roomID = v
}
if v := creds.Extra[credentialKeyCredentials]; v != "" {
s.credentials = v
}
if v := creds.Extra[credentialKeyRoomURL]; v != "" {
s.roomURL = v
}
if v := creds.Extra[credentialKeyTelemetryReferer]; v != "" {
s.telemetryReferer = v
}
}
func (s *Session) drainReconnectQueue() {
for {
select {
case <-s.reconnectCh:
default:
return
}
}
}
func (s *Session) queueReconnect() {
if s.closed.Load() || s.reconnecting.Load() {
return
}
if s.shouldReconnect != nil && !s.shouldReconnect() {
return
}
select {
case s.reconnectCh <- struct{}{}:
default:
}
}
func (s *Session) stopSession() {
s.stopTelemetry()
s.sessionMu.Lock()
closeSignal(s.keepAliveCh)
closeSignal(s.sessionCloseCh)
s.sessionMu.Unlock()
}
func (s *Session) resetSession() (chan struct{}, chan struct{}) {
s.sessionMu.Lock()
defer s.sessionMu.Unlock()
s.keepAliveCh = make(chan struct{})
s.sessionCloseCh = make(chan struct{})
return s.keepAliveCh, s.sessionCloseCh
}
func (s *Session) resetMediaState() {
s.subscriberReady.Store(false)
s.publisherReady.Store(false)
s.subscriberConn = make(chan struct{})
s.publisherConn = make(chan struct{})
}
func (s *Session) signalEnded(reason string) {
s.closed.Store(true)
s.stopTelemetry()
if s.onEnded != nil {
s.onEnded(reason)
}
}

View File

@@ -0,0 +1,318 @@
package goolom
import (
"fmt"
"strings"
"time"
"github.com/google/uuid"
"github.com/openlibrecommunity/olcrtc/internal/logger"
"github.com/pion/webrtc/v4"
)
func (s *Session) setupDataChannelHandlers(dcReady chan struct{}, sessionCloseCh chan struct{}) {
s.dc.OnOpen(func() {
numWorkers := 4
for i := range numWorkers {
s.wg.Add(1)
go func(workerID int) {
defer s.wg.Done()
s.processSendQueue(workerID, sessionCloseCh)
}(i)
}
close(dcReady)
})
s.dc.OnClose(s.onDataChannelClose)
s.dc.OnMessage(s.onDataChannelMessage)
s.pcSub.OnDataChannel(func(dc *webrtc.DataChannel) {
if s.onData != nil {
dc.OnMessage(s.onDataChannelMessage)
}
})
}
func (s *Session) onDataChannelClose() {
if !s.closed.Load() {
s.queueReconnect()
}
}
func (s *Session) onDataChannelMessage(msg webrtc.DataChannelMessage) {
if s.onData != nil && len(msg.Data) > 0 {
s.onData(msg.Data)
}
}
func (s *Session) handleSdpOffer(offer map[string]any, uid string, sendPub bool) error {
sdp, _ := offer["sdp"].(string)
pcSeq, _ := offer["pcSeq"].(float64)
if err := s.pcSub.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: sdp,
}); err != nil {
return fmt.Errorf("set remote desc: %w", err)
}
answer, err := s.pcSub.CreateAnswer(nil)
if err != nil {
return fmt.Errorf("create answer: %w", err)
}
if err := s.pcSub.SetLocalDescription(answer); err != nil {
return fmt.Errorf("set local desc: %w", err)
}
s.wsMu.Lock()
_ = s.ws.WriteJSON(map[string]any{
keyUID: uuid.New().String(),
"subscriberSdpAnswer": map[string]any{
keyPcSeq: int(pcSeq),
"sdp": answer.SDP,
},
})
s.wsMu.Unlock()
s.sendAck(uid)
if s.onData == nil {
if err := s.sendSetSlots(); err != nil {
logger.Debugf("setSlots error: %v", err)
}
}
if !sendPub {
return nil
}
time.Sleep(300 * time.Millisecond)
pubOffer, err := s.pcPub.CreateOffer(nil)
if err != nil {
return fmt.Errorf("create pub offer: %w", err)
}
if err := s.pcPub.SetLocalDescription(pubOffer); err != nil {
return fmt.Errorf("set local pub desc: %w", err)
}
s.wsMu.Lock()
_ = s.ws.WriteJSON(map[string]any{
keyUID: uuid.New().String(),
"publisherSdpOffer": map[string]any{
keyPcSeq: 1,
"sdp": pubOffer.SDP,
"tracks": s.publisherTrackDescriptions(),
},
})
s.wsMu.Unlock()
return nil
}
func (s *Session) handleSdpAnswer(answer map[string]any, uid string) {
sdp, _ := answer["sdp"].(string)
if err := s.pcPub.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: sdp,
}); err != nil {
logger.Debugf("SetRemoteDescription error: %v", err)
}
s.sendAck(uid)
}
func (s *Session) handleICE(cand map[string]any) {
candStr, _ := cand["candidate"].(string)
target, _ := cand["target"].(string)
sdpMid, _ := cand["sdpMid"].(string)
sdpMLineIndex, _ := cand["sdpMlineIndex"].(float64)
parts := strings.Fields(candStr)
if len(parts) < 8 {
return
}
init := webrtc.ICECandidateInit{
Candidate: candStr,
SDPMid: &sdpMid,
SDPMLineIndex: func() *uint16 { v := uint16(sdpMLineIndex); return &v }(),
}
switch target {
case "SUBSCRIBER":
_ = s.pcSub.AddICECandidate(init)
case "PUBLISHER":
_ = s.pcPub.AddICECandidate(init)
}
}
func (s *Session) setupICEHandlers() {
s.pcSub.OnICECandidate(func(c *webrtc.ICECandidate) {
if c == nil {
return
}
init := c.ToJSON()
s.wsMu.Lock()
_ = s.ws.WriteJSON(map[string]any{
keyUID: uuid.New().String(),
"webrtcIceCandidate": map[string]any{
"candidate": init.Candidate,
"sdpMid": init.SDPMid,
"sdpMlineIndex": init.SDPMLineIndex,
"target": "SUBSCRIBER",
keyPcSeq: 1,
},
})
s.wsMu.Unlock()
})
s.pcPub.OnICECandidate(func(c *webrtc.ICECandidate) {
if c == nil {
return
}
init := c.ToJSON()
s.wsMu.Lock()
_ = s.ws.WriteJSON(map[string]any{
keyUID: uuid.New().String(),
"webrtcIceCandidate": map[string]any{
"candidate": init.Candidate,
"sdpMid": init.SDPMid,
"sdpMlineIndex": init.SDPMLineIndex,
"target": "PUBLISHER",
keyPcSeq: 1,
},
})
s.wsMu.Unlock()
})
}
func (s *Session) sendSetSlots() error {
s.wsMu.Lock()
defer s.wsMu.Unlock()
// Goolom only forwards as many remote videos as the subscriber asks for via
// setSlots. Request a generous count so each subscriber sees every active
// publisher in the room.
slots := make([]map[string]int, 0, 8)
for range 8 {
slots = append(slots, map[string]int{"width": 1280, "height": 720})
}
if err := s.ws.WriteJSON(map[string]any{
keyUID: uuid.New().String(),
"setSlots": map[string]any{
"slots": slots,
"audioSlotsCount": 0,
"key": 1,
"shutdownAllVideo": nil,
"withSelfView": false,
"selfViewVisibility": "ON_LOADING_THEN_SHOW",
"gridConfig": map[string]any{},
},
}); err != nil {
return fmt.Errorf("write set slots: %w", err)
}
return nil
}
func (s *Session) publisherTrackDescriptions() []map[string]any {
if s.pcPub == nil {
return nil
}
tracks := make([]map[string]any, 0)
for _, transceiver := range s.pcPub.GetTransceivers() {
sender := transceiver.Sender()
if sender == nil {
continue
}
track := sender.Track()
if track == nil {
continue
}
kind := "VIDEO"
if track.Kind() == webrtc.RTPCodecTypeAudio {
kind = "AUDIO"
}
tracks = append(tracks, map[string]any{
"mid": transceiver.Mid(),
"transceiverMid": transceiver.Mid(),
"kind": kind,
"priority": 0,
"label": track.ID(),
"codecs": map[string]any{},
"groupId": 1,
keyDescription: "",
})
}
return tracks
}
func isNonTURNURL(url string) bool {
return url != "" && !strings.HasPrefix(url, "turn:") && !strings.HasPrefix(url, "turns:")
}
func parseICEURLs(server map[string]any) []string {
var urls []string
switch rawURLs := server["urls"].(type) {
case []any:
for _, rawURL := range rawURLs {
if url, ok := rawURL.(string); ok && isNonTURNURL(url) {
urls = append(urls, url)
}
}
case []string:
for _, url := range rawURLs {
if isNonTURNURL(url) {
urls = append(urls, url)
}
}
}
return urls
}
func parseICEServer(rawServer any) (webrtc.ICEServer, bool) {
server, ok := rawServer.(map[string]any)
if !ok {
return webrtc.ICEServer{}, false
}
urls := parseICEURLs(server)
if len(urls) == 0 {
return webrtc.ICEServer{}, false
}
ice := webrtc.ICEServer{URLs: urls}
if username, ok := server["username"].(string); ok {
ice.Username = username
}
if credential, ok := server["credential"].(string); ok {
ice.Credential = credential
}
return ice, true
}
func (s *Session) applyServerHelloConfig(serverHello map[string]any) {
rawCfg, ok := serverHello["rtcConfiguration"].(map[string]any)
if !ok {
return
}
rawServers, ok := rawCfg["iceServers"].([]any)
if !ok || len(rawServers) == 0 {
return
}
iceServers := make([]webrtc.ICEServer, 0, len(rawServers))
for _, rawServer := range rawServers {
if ice, ok := parseICEServer(rawServer); ok {
iceServers = append(iceServers, ice)
}
}
if len(iceServers) == 0 {
return
}
cfg := webrtc.Configuration{
ICEServers: iceServers,
SDPSemantics: webrtc.SDPSemanticsUnifiedPlan,
}
if s.pcSub != nil {
_ = s.pcSub.SetConfiguration(cfg)
}
if s.pcPub != nil {
_ = s.pcPub.SetConfiguration(cfg)
}
}

View File

@@ -0,0 +1,322 @@
// Package goolom implements an engine.Session backed by the Goolom SFU
// signaling protocol. Goolom is the proprietary SFU developed for Yandex
// Telemost; the on-wire protocol — capabilities offer, separated subscriber
// and publisher PeerConnections, ack/pong keepalive, slots-based subscribe
// model — is what this engine speaks.
//
// HTTP auth (room-info lookup, telemetry referer, etc.) lives in the auth
// package; this engine consumes a media-server WebSocket URL plus the
// peer/room/credentials tuple supplied as engine.Config.
package goolom
import (
"context"
"errors"
"fmt"
"net/http"
"sync"
"sync/atomic"
"time"
"github.com/gorilla/websocket"
"github.com/openlibrecommunity/olcrtc/internal/engine"
"github.com/pion/webrtc/v4"
)
const (
realDataChannelMessageLimit = 12288
defaultSendDelayLow = 2 * time.Millisecond
defaultSendDelayMax = 12 * time.Millisecond
defaultTelemetryInterval = 20 * time.Second
defaultSendQueueSize = 5000
defaultBufferHighWaterMark = 512 * 1024
defaultSendQueueCapHard = 4000
wsReadTimeout = 60 * time.Second
wsHandshakeTimeout = 15 * time.Second
keyUID = "uid"
keyDescription = "description"
keyPcSeq = "pcSeq"
keyName = "name"
stateTerminated = "terminated"
credentialKeyRoomID = "roomID"
credentialKeyCredentials = "credentials"
credentialKeyRoomURL = "roomURL"
credentialKeyTelemetryReferer = "telemetryReferer"
)
var (
// ErrDataChannelTimeout is returned when the DataChannel fails to open in time.
ErrDataChannelTimeout = errors.New("datachannel timeout")
// ErrDataChannelNotReady is returned when send is called before the DataChannel is open.
ErrDataChannelNotReady = errors.New("datachannel not ready")
// ErrSendQueueClosed is returned when send is called after Close.
ErrSendQueueClosed = errors.New("send queue closed")
// ErrSendQueueTimeout is returned when the send queue cannot accept new data in time.
ErrSendQueueTimeout = errors.New("send queue timeout")
// ErrSessionClosed is returned when the session is closed mid-operation.
ErrSessionClosed = errors.New("session closed")
// ErrPeerClosed is returned when the peer is closed mid-operation.
ErrPeerClosed = errors.New("peer closed")
// ErrSubscriberMediaTimeout is returned when the subscriber media is not ready in time.
ErrSubscriberMediaTimeout = errors.New("subscriber media timeout")
// ErrPublisherNotInitialized is returned when the publisher PC is not set up.
ErrPublisherNotInitialized = errors.New("publisher peer connection not initialized")
// ErrURLRequired is returned when no media-server WebSocket URL was supplied.
ErrURLRequired = errors.New("goolom media server URL required")
// ErrRoomIDRequired is returned when no room ID was supplied.
ErrRoomIDRequired = errors.New("goolom room ID required")
// ErrPeerIDRequired is returned when no peer ID was supplied.
ErrPeerIDRequired = errors.New("goolom peer ID required")
// ErrNoRefresh is returned when reconnect is attempted without a refresh callback.
ErrNoRefresh = errors.New("goolom reconnect: no refresh callback supplied")
)
// TrafficShape controls outgoing data-channel pacing.
type TrafficShape struct {
MaxMessageSize int
MinDelay time.Duration
MaxDelay time.Duration
}
// Session is the Goolom engine handle.
type Session struct {
name string
mediaServerURL string
peerID string
roomID string
credentials string
roomURL string // referer for telemetry — opaque to the engine
telemetryReferer string
refresh func(ctx context.Context) (engine.Credentials, error)
ws *websocket.Conn
wsMu sync.Mutex
pcSub *webrtc.PeerConnection
pcPub *webrtc.PeerConnection
dc *webrtc.DataChannel
onData func([]byte)
onReconnect func(*webrtc.DataChannel)
shouldReconnect func() bool
onEnded func(string)
reconnectCh chan struct{}
closeCh chan struct{}
keepAliveCh chan struct{}
telemetryCh chan struct{}
sessionCloseCh chan struct{}
lastReconnect time.Time
reconnectCount int
sessionMu sync.Mutex
sendQueue chan []byte
sendQueueClosed atomic.Bool
closed atomic.Bool
reconnecting atomic.Bool
telemetryActive atomic.Bool
ackMu sync.Mutex
ackWaiters map[string]chan struct{}
trafficShape TrafficShape
videoTrackMu sync.RWMutex
videoTracks []webrtc.TrackLocal
onVideoTrack func(*webrtc.TrackRemote, *webrtc.RTPReceiver)
subscriberReady atomic.Bool
publisherReady atomic.Bool
subscriberConn chan struct{}
publisherConn chan struct{}
wg sync.WaitGroup
httpClient *http.Client
}
// New creates a new Goolom engine session.
//
// cfg.URL is the media server WebSocket URL. cfg.Token carries the peer ID.
// cfg.Extra carries the rest of the room tuple: roomID, credentials, and an
// optional roomURL / telemetryReferer string the engine uses verbatim as the
// Referer header for telemetry posts.
func New(_ context.Context, cfg engine.Config) (engine.Session, error) {
if cfg.URL == "" {
return nil, ErrURLRequired
}
peerID := cfg.Token
if peerID == "" {
return nil, ErrPeerIDRequired
}
roomID := ""
credentials := ""
roomURL := ""
telemetryReferer := ""
if cfg.Extra != nil {
roomID = cfg.Extra[credentialKeyRoomID]
credentials = cfg.Extra[credentialKeyCredentials]
roomURL = cfg.Extra[credentialKeyRoomURL]
telemetryReferer = cfg.Extra[credentialKeyTelemetryReferer]
}
if roomID == "" {
return nil, ErrRoomIDRequired
}
if telemetryReferer == "" {
telemetryReferer = roomURL
}
return &Session{
name: cfg.Name,
mediaServerURL: cfg.URL,
peerID: peerID,
roomID: roomID,
credentials: credentials,
roomURL: roomURL,
telemetryReferer: telemetryReferer,
refresh: cfg.Refresh,
onData: cfg.OnData,
reconnectCh: make(chan struct{}, 1),
closeCh: make(chan struct{}),
keepAliveCh: make(chan struct{}),
sessionCloseCh: make(chan struct{}),
telemetryCh: make(chan struct{}, 1),
sendQueue: make(chan []byte, defaultSendQueueSize),
ackWaiters: make(map[string]chan struct{}),
subscriberConn: make(chan struct{}),
publisherConn: make(chan struct{}),
trafficShape: TrafficShape{
MaxMessageSize: realDataChannelMessageLimit,
MinDelay: defaultSendDelayLow,
MaxDelay: defaultSendDelayMax,
},
httpClient: nil,
}, nil
}
// Capabilities reports what this engine can do.
func (s *Session) Capabilities() engine.Capabilities {
return engine.Capabilities{ByteStream: true, VideoTrack: true}
}
// SetTrafficShape adjusts the outgoing data-channel pacing.
func (s *Session) SetTrafficShape(shape TrafficShape) {
if shape.MaxMessageSize <= 0 {
shape.MaxMessageSize = realDataChannelMessageLimit
}
if shape.MaxDelay < shape.MinDelay {
shape.MaxDelay = shape.MinDelay
}
s.trafficShape = shape
}
// Send queues data for transmission.
func (s *Session) Send(data []byte) error {
if s.dc == nil || s.dc.ReadyState() != webrtc.DataChannelStateOpen {
return ErrDataChannelNotReady
}
if s.sendQueueClosed.Load() {
return ErrSendQueueClosed
}
select {
case s.sendQueue <- data:
return nil
case <-time.After(50 * time.Millisecond):
return ErrSendQueueTimeout
}
}
// GetSendQueue returns the transmission queue.
func (s *Session) GetSendQueue() chan []byte { return s.sendQueue }
// GetBufferedAmount returns the WebRTC buffered amount.
func (s *Session) GetBufferedAmount() uint64 {
if s.dc != nil {
return s.dc.BufferedAmount()
}
return 0
}
// SetEndedCallback sets the callback for connection termination.
func (s *Session) SetEndedCallback(cb func(string)) { s.onEnded = cb }
// SetReconnectCallback sets the callback for reconnection events.
func (s *Session) SetReconnectCallback(cb func(*webrtc.DataChannel)) { s.onReconnect = cb }
// SetShouldReconnect sets the policy for reconnection.
func (s *Session) SetShouldReconnect(fn func() bool) { s.shouldReconnect = fn }
// CanSend checks if data can be sent.
func (s *Session) CanSend() bool {
if s.onData == nil {
if s.hasLocalVideoTracks() {
return !s.closed.Load() && s.subscriberReady.Load() && s.publisherReady.Load()
}
return !s.closed.Load() && s.subscriberReady.Load()
}
if s.dc == nil || s.dc.ReadyState() != webrtc.DataChannelStateOpen {
return false
}
return len(s.sendQueue) < defaultSendQueueCapHard
}
// AddVideoTrack adds a video track to the publisher peer connection.
func (s *Session) AddVideoTrack(track webrtc.TrackLocal) error {
s.videoTrackMu.Lock()
s.videoTracks = append(s.videoTracks, track)
s.videoTrackMu.Unlock()
if s.pcPub == nil {
return nil
}
if _, err := s.pcPub.AddTrack(track); err != nil {
return fmt.Errorf("failed to add track: %w", err)
}
return nil
}
// SetVideoTrackHandler registers a callback for remote video tracks.
func (s *Session) SetVideoTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) {
s.videoTrackMu.Lock()
defer s.videoTrackMu.Unlock()
s.onVideoTrack = cb
}
func (s *Session) hasLocalVideoTracks() bool {
s.videoTrackMu.RLock()
defer s.videoTrackMu.RUnlock()
return len(s.videoTracks) > 0
}
func (s *Session) videoTrackHandler() func(*webrtc.TrackRemote, *webrtc.RTPReceiver) {
s.videoTrackMu.RLock()
defer s.videoTrackMu.RUnlock()
return s.onVideoTrack
}
func (s *Session) attachPendingVideoTracks() error {
s.videoTrackMu.RLock()
defer s.videoTrackMu.RUnlock()
for _, track := range s.videoTracks {
if _, err := s.pcPub.AddTrack(track); err != nil {
return fmt.Errorf("add video track: %w", err)
}
}
return nil
}
func closeSignal(ch chan struct{}) {
if ch == nil {
return
}
select {
case <-ch:
default:
close(ch)
}
}
func init() { //nolint:gochecknoinits // engine registration is the canonical Go pattern for plugins
engine.Register("goolom", New)
}

View File

@@ -0,0 +1,303 @@
package goolom
import (
"context"
"fmt"
"runtime"
"strings"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/openlibrecommunity/olcrtc/internal/logger"
)
func (s *Session) sendHello() error {
hello := map[string]any{
keyUID: uuid.New().String(),
"hello": map[string]any{
"participantMeta": map[string]any{
keyName: s.name,
"role": "SPEAKER",
keyDescription: "",
"sendAudio": false,
"sendVideo": s.hasLocalVideoTracks(),
},
"participantAttributes": map[string]any{
keyName: s.name,
"role": "SPEAKER",
keyDescription: "",
},
"sendAudio": false,
"sendVideo": s.hasLocalVideoTracks(),
"sendSharing": false,
"participantId": s.peerID,
"roomId": s.roomID,
"serviceName": "telemost",
"credentials": s.credentials,
"capabilitiesOffer": goolomCapabilitiesOffer(),
"sdkInfo": map[string]any{
"implementation": "browser",
"version": "5.27.0",
"userAgent": "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0",
"hwConcurrency": runtime.NumCPU(),
},
"sdkInitializationId": uuid.New().String(),
"disablePublisher": !s.hasLocalVideoTracks(),
"disableSubscriber": false,
"disableSubscriberAudio": true,
},
}
s.wsMu.Lock()
defer s.wsMu.Unlock()
if err := s.ws.WriteJSON(hello); err != nil {
return fmt.Errorf("write hello: %w", err)
}
return nil
}
func (s *Session) handleSignaling(ctx context.Context) {
pubSent := false
for {
var msg map[string]any
if err := s.ws.ReadJSON(&msg); err != nil {
if !s.closed.Load() {
logger.Debugf("ws read error: %v", err)
s.queueReconnect()
}
return
}
s.updateWSDeadline()
uid, _ := msg[keyUID].(string)
s.handleMessageEvents(ctx, msg, uid)
if isConferenceEndMessage(msg) {
s.signalEnded("conference ended")
return
}
if offer, ok := msg["subscriberSdpOffer"].(map[string]any); ok {
if err := s.handleSdpOffer(offer, uid, !pubSent); err != nil {
logger.Debugf("sdp offer error: %v", err)
continue
}
pubSent = true
}
s.handleSignalingResponses(msg, uid)
}
}
func (s *Session) handleMessageEvents(ctx context.Context, msg map[string]any, uid string) {
if _, ok := msg["ack"]; ok {
s.resolveAck(uid)
}
if serverHello, ok := msg["serverHello"].(map[string]any); ok {
s.applyServerHelloConfig(serverHello)
s.startTelemetry(ctx, serverHello)
s.sendAck(uid)
}
s.handleCommonMessages(msg, uid)
}
func (s *Session) handleSignalingResponses(msg map[string]any, uid string) {
if answer, ok := msg["publisherSdpAnswer"].(map[string]any); ok {
s.handleSdpAnswer(answer, uid)
}
if cand, ok := msg["webrtcIceCandidate"].(map[string]any); ok {
s.handleICE(cand)
}
}
func (s *Session) updateWSDeadline() {
s.wsMu.Lock()
if s.ws != nil {
_ = s.ws.SetReadDeadline(time.Now().Add(wsReadTimeout))
}
s.wsMu.Unlock()
}
func (s *Session) handleCommonMessages(msg map[string]any, uid string) {
if _, ok := msg["updateDescription"]; ok {
s.sendAck(uid)
}
if _, ok := msg["vadActivity"]; ok {
s.sendAck(uid)
}
if _, ok := msg["ping"]; ok {
s.sendPong(uid)
}
if _, ok := msg["pong"]; ok {
s.sendAck(uid)
}
}
func (s *Session) sendAck(uid string) {
if uid == "" {
return
}
s.wsMu.Lock()
defer s.wsMu.Unlock()
_ = s.ws.WriteJSON(map[string]any{
keyUID: uid,
"ack": map[string]any{
"status": map[string]any{"code": "OK"},
},
})
}
func (s *Session) sendPong(uid string) {
s.wsMu.Lock()
defer s.wsMu.Unlock()
_ = s.ws.WriteJSON(map[string]any{
keyUID: uid,
"pong": map[string]any{},
})
}
func (s *Session) registerAckWaiter(uid string) chan struct{} {
ch := make(chan struct{})
s.ackMu.Lock()
s.ackWaiters[uid] = ch
s.ackMu.Unlock()
return ch
}
func (s *Session) removeAckWaiter(uid string) {
s.ackMu.Lock()
delete(s.ackWaiters, uid)
s.ackMu.Unlock()
}
func (s *Session) waitForAck(uid string, ch <-chan struct{}, timeout time.Duration) bool {
if uid == "" {
return false
}
defer s.removeAckWaiter(uid)
select {
case <-ch:
return true
case <-time.After(timeout):
return false
case <-s.closeCh:
return false
}
}
func (s *Session) resolveAck(uid string) {
if uid == "" {
return
}
s.ackMu.Lock()
ch := s.ackWaiters[uid]
if ch != nil {
delete(s.ackWaiters, uid)
close(ch)
}
s.ackMu.Unlock()
}
func (s *Session) sendLeave(uid string) bool {
s.wsMu.Lock()
defer s.wsMu.Unlock()
if s.ws == nil {
return false
}
leave := map[string]any{
keyUID: uid,
"leave": map[string]any{},
}
if err := s.ws.WriteJSON(leave); err != nil {
return false
}
return true
}
func (s *Session) keepAlive(keepAliveCh <-chan struct{}) {
wsTicker := time.NewTicker(30 * time.Second)
defer wsTicker.Stop()
appTicker := time.NewTicker(5 * time.Second)
defer appTicker.Stop()
for {
select {
case <-wsTicker.C:
if !s.sendWSPing() {
return
}
case <-appTicker.C:
if !s.sendAppPing() {
return
}
case <-keepAliveCh:
return
case <-s.closeCh:
return
}
}
}
func (s *Session) sendWSPing() bool {
s.wsMu.Lock()
defer s.wsMu.Unlock()
if s.ws != nil {
if err := s.ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)); err != nil {
logger.Debugf("ws ping error: %v", err)
s.queueReconnect()
return false
}
}
return true
}
func (s *Session) sendAppPing() bool {
s.wsMu.Lock()
defer s.wsMu.Unlock()
if s.ws != nil {
if err := s.ws.WriteJSON(map[string]any{
keyUID: uuid.New().String(),
"ping": map[string]any{},
}); err != nil {
logger.Debugf("app ping error: %v", err)
s.queueReconnect()
return false
}
}
return true
}
func isConferenceEndMessage(msg map[string]any) bool {
for _, key := range []string{"conferenceClosed", "conferenceEnded", "roomClosed", "roomEnded", "callEnded"} {
if _, ok := msg[key]; ok {
return true
}
}
if raw, ok := msg["conference"].(map[string]any); ok {
if state, _ := raw["state"].(string); isEndedState(state) {
return true
}
}
if raw, ok := msg["conferenceState"].(map[string]any); ok {
if state, _ := raw["state"].(string); isEndedState(state) {
return true
}
}
return false
}
func isEndedState(state string) bool {
switch strings.ToLower(state) {
case "closed", "ended", "finished", stateTerminated:
return true
default:
return false
}
}

View File

@@ -0,0 +1,246 @@
package goolom
import (
"bytes"
"context"
"encoding/json"
"math/rand/v2"
"net/http"
"time"
"github.com/google/uuid"
"github.com/openlibrecommunity/olcrtc/internal/logger"
"github.com/openlibrecommunity/olcrtc/internal/protect"
)
func (s *Session) processSendQueue(workerID int, sessionCloseCh <-chan struct{}) {
for {
select {
case <-sessionCloseCh:
return
case <-s.closeCh:
return
case data := <-s.sendQueue:
if len(data) > s.trafficShape.MaxMessageSize {
logger.Debugf("oversized message size=%d limit=%d", len(data), s.trafficShape.MaxMessageSize)
continue
}
waited, err := s.waitBufferedAmount(workerID, sessionCloseCh)
if err != nil {
return
}
if waited > 0 {
logger.Verbosef("[WORKER-%d] Drained after %v", workerID, waited)
}
if err := s.dc.Send(data); err != nil {
logger.Debugf("send error: %v", err)
s.queueReconnect()
return
}
if s.trafficShape.MinDelay > 0 {
time.Sleep(s.calculateDelay())
}
}
}
}
func (s *Session) waitBufferedAmount(workerID int, sessionCloseCh <-chan struct{}) (time.Duration, error) {
start := time.Now()
for s.dc.BufferedAmount() > defaultBufferHighWaterMark {
select {
case <-sessionCloseCh:
return 0, ErrSessionClosed
case <-s.closeCh:
return 0, ErrPeerClosed
case <-time.After(10 * time.Millisecond):
if time.Since(start) > 5*time.Second {
logger.Debugf("buffer wait timeout worker=%d", workerID)
return time.Since(start), nil
}
}
}
return time.Since(start), nil
}
func (s *Session) calculateDelay() time.Duration {
minDelay := s.trafficShape.MinDelay
maxDelay := s.trafficShape.MaxDelay
if maxDelay <= minDelay {
return minDelay
}
return minDelay + time.Duration(rand.Int64N(int64(maxDelay-minDelay))) //nolint:gosec,lll // G404: non-cryptographic shaping randomness
}
func (s *Session) startTelemetry(ctx context.Context, serverHello map[string]any) {
endpoint, interval, ok := parseTelemetryCfg(serverHello)
if !ok {
return
}
if !s.telemetryActive.CompareAndSwap(false, true) {
return
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
defer s.telemetryActive.Store(false)
ticker := time.NewTicker(interval)
defer ticker.Stop()
s.sendTelemetry(ctx, endpoint, "join")
for {
select {
case <-ticker.C:
s.sendTelemetry(ctx, endpoint, "stats")
case <-s.telemetryCh:
s.sendTelemetry(ctx, endpoint, "leave")
return
case <-s.closeCh:
s.sendTelemetry(ctx, endpoint, "leave")
return
}
}
}()
}
func parseTelemetryCfg(serverHello map[string]any) (string, time.Duration, bool) {
cfg, ok := serverHello["telemetryConfiguration"].(map[string]any)
if !ok {
return "", 0, false
}
endpoint, ok := cfg["logEndpoint"].(string)
if !ok || endpoint == "" {
endpoint, ok = cfg["endpoint"].(string)
if !ok || endpoint == "" {
endpoint, _ = cfg["url"].(string)
}
}
if endpoint == "" {
return "", 0, false
}
interval := defaultTelemetryInterval
if raw, ok := cfg["sendingInterval"].(float64); ok && raw > 0 {
interval = time.Duration(raw) * time.Millisecond
}
return endpoint, interval, true
}
func (s *Session) stopTelemetry() {
if s.telemetryActive.Load() {
select {
case s.telemetryCh <- struct{}{}:
default:
}
}
}
func (s *Session) sendTelemetry(ctx context.Context, endpoint, event string) {
body, err := json.Marshal(map[string]any{
"event": event,
"timestamp": time.Now().UnixMilli(),
"peerId": s.peerID,
"roomId": s.roomID,
"displayName": s.name,
"implementation": "olcrtc-go",
"dataChannel": map[string]any{
"bufferedAmount": s.GetBufferedAmount(),
"sendQueue": len(s.sendQueue),
},
})
if err != nil {
return
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
if err != nil {
logger.Verbosef("Telemetry req error: %v", err)
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0")
if s.telemetryReferer != "" {
req.Header.Set("Referer", s.telemetryReferer)
}
req.Header.Set("X-Requested-With", "XMLHttpRequest")
req.Header.Set("Client-Instance-Id", uuid.New().String())
req.Header.Set("X-Telemost-Client-Version", "187.1.0")
req.Header.Set("Idempotency-Key", uuid.New().String())
client := protect.NewHTTPClient()
resp, err := client.Do(req)
if err != nil {
logger.Verbosef("Telemetry send error: %v", err)
return
}
defer func() { _ = resp.Body.Close() }()
}
func goolomCapabilitiesOffer() map[string]any {
return map[string]any{
"offerAnswerMode": []string{"SEPARATE"},
"initialSubscriberOffer": []string{"ON_HELLO"},
"slotsMode": []string{"FROM_CONTROLLER"},
"simulcastMode": []string{"DISABLED", "STATIC"},
"selfVadStatus": []string{"FROM_SERVER", "FROM_CLIENT"},
"dataChannelSharing": []string{"TO_RTP"},
"videoEncoderConfig": []string{"NO_CONFIG", "ONLY_INIT_CONFIG", "RUNTIME_CONFIG"},
"dataChannelVideoCodec": []string{"VP8", "UNIQUE_CODEC_FROM_TRACK_DESCRIPTION"},
"bandwidthLimitationReason": []string{
"BANDWIDTH_REASON_DISABLED",
"BANDWIDTH_REASON_ENABLED",
},
"sdkDefaultDeviceManagement": []string{
"SDK_DEFAULT_DEVICE_MANAGEMENT_DISABLED",
"SDK_DEFAULT_DEVICE_MANAGEMENT_ENABLED",
},
"joinOrderLayout": []string{"JOIN_ORDER_LAYOUT_DISABLED", "JOIN_ORDER_LAYOUT_ENABLED"},
"pinLayout": []string{"PIN_LAYOUT_DISABLED"},
"sendSelfViewVideoSlot": []string{
"SEND_SELF_VIEW_VIDEO_SLOT_DISABLED",
"SEND_SELF_VIEW_VIDEO_SLOT_ENABLED",
},
"serverLayoutTransition": []string{"SERVER_LAYOUT_TRANSITION_DISABLED"},
"sdkPublisherOptimizeBitrate": []string{
"SDK_PUBLISHER_OPTIMIZE_BITRATE_DISABLED",
"SDK_PUBLISHER_OPTIMIZE_BITRATE_FULL",
"SDK_PUBLISHER_OPTIMIZE_BITRATE_ONLY_SELF",
},
"sdkNetworkLostDetection": []string{"SDK_NETWORK_LOST_DETECTION_DISABLED"},
"sdkNetworkPathMonitor": []string{"SDK_NETWORK_PATH_MONITOR_DISABLED"},
"publisherVp9": []string{"PUBLISH_VP9_DISABLED", "PUBLISH_VP9_ENABLED"},
"svcMode": []string{"SVC_MODE_DISABLED", "SVC_MODE_L3T3", "SVC_MODE_L3T3_KEY"},
"subscriberOfferAsyncAck": []string{"SUBSCRIBER_OFFER_ASYNC_ACK_DISABLED", "SUBSCRIBER_OFFER_ASYNC_ACK_ENABLED"},
"androidBluetoothRoutingFix": []string{
"ANDROID_BLUETOOTH_ROUTING_FIX_DISABLED",
},
"fixedIceCandidatesPoolSize": []string{
"FIXED_ICE_CANDIDATES_POOL_SIZE_DISABLED",
},
"sdkAndroidTelecomIntegration": []string{
"SDK_ANDROID_TELECOM_INTEGRATION_DISABLED",
},
"setActiveCodecsMode": []string{
"SET_ACTIVE_CODECS_MODE_DISABLED",
"SET_ACTIVE_CODECS_MODE_VIDEO_ONLY",
},
"subscriberDtlsPassiveMode": []string{
"SUBSCRIBER_DTLS_PASSIVE_MODE_DISABLED",
},
"publisherOpusDred": []string{
"PUBLISHER_OPUS_DRED_DISABLED",
},
"publisherOpusLowBitrate": []string{
"PUBLISHER_OPUS_LOW_BITRATE_DISABLED",
},
"sdkAndroidDestroySessionOnTaskRemoved": []string{
"SDK_ANDROID_DESTROY_SESSION_ON_TASK_REMOVED_DISABLED",
},
"svcModes": []string{"FALSE"},
"reportTelemetryModes": []string{"TRUE"},
"keepDefaultDevicesModes": []string{"FALSE"},
}
}

View File

@@ -1,50 +0,0 @@
// Package provider defines the interface and registry for different WebRTC providers.
package provider
import (
"context"
"errors"
"github.com/pion/webrtc/v4"
)
var (
// ErrDataChannelTimeout is returned when the DataChannel fails to open within the timeout period.
ErrDataChannelTimeout = errors.New("datachannel timeout")
// ErrDataChannelNotReady is returned when attempting to send data before the DataChannel is open.
ErrDataChannelNotReady = errors.New("datachannel not ready")
// ErrSendQueueClosed is returned when attempting to send data after the send queue has been closed.
ErrSendQueueClosed = errors.New("send queue closed")
// ErrSendQueueTimeout is returned when the send queue is full and the timeout is reached.
ErrSendQueueTimeout = errors.New("send queue timeout")
)
// Provider defines the standard interface for WebRTC connection handlers.
type Provider interface {
Connect(ctx context.Context) error
Send(data []byte) error
Close() error
SetReconnectCallback(cb func(*webrtc.DataChannel))
SetShouldReconnect(fn func() bool)
SetEndedCallback(cb func(string))
WatchConnection(ctx context.Context)
CanSend() bool
GetSendQueue() chan []byte
GetBufferedAmount() uint64
}
// VideoTrackCapable is implemented by providers that can exchange video tracks.
type VideoTrackCapable interface {
AddVideoTrack(track webrtc.TrackLocal) error
SetVideoTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver))
}
// Config holds common configuration for all providers.
type Config struct {
RoomURL string
Name string
OnData func([]byte)
DNSServer string
ProxyAddr string
ProxyPort int
}

View File

@@ -1,83 +0,0 @@
package telemost
import (
"context"
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"strings"
"testing"
)
func withTelemostAPIServer(t *testing.T, h http.Handler) {
t.Helper()
old := apiBase
srv := httptest.NewServer(h)
t.Cleanup(func() {
apiBase = old
srv.Close()
})
apiBase = srv.URL
}
func TestGetConnectionInfo(t *testing.T) {
withTelemostAPIServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodGet {
t.Fatalf("method = %s", r.Method)
}
if !strings.Contains(r.URL.EscapedPath(), "/conferences/room%2Fid/connection") {
t.Fatalf("path = %q escaped=%q", r.URL.Path, r.URL.EscapedPath())
}
if r.URL.Query().Get("display_name") != "peer" {
t.Fatalf("display_name query = %q", r.URL.Query().Get("display_name"))
}
_ = json.NewEncoder(w).Encode(ConnectionInfo{
RoomID: "room", //nolint:goconst // test literal, repetition is intentional
PeerID: "peer-id", //nolint:goconst // test literal, repetition is intentional
Credentials: "creds", //nolint:goconst // test literal, repetition is intentional
})
}))
info, err := GetConnectionInfo(context.Background(), "room/id", "peer")
if err != nil {
t.Fatalf("GetConnectionInfo() error = %v", err)
}
if info.RoomID != "room" || info.PeerID != "peer-id" || info.Credentials != "creds" {
t.Fatalf("GetConnectionInfo() = %+v", info)
}
}
func TestGetConnectionInfoErrors(t *testing.T) {
withTelemostAPIServer(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
http.Error(w, "bad", http.StatusForbidden)
}))
if _, err := GetConnectionInfo(context.Background(), "room", "peer"); !errors.Is(err, ErrAPI) {
t.Fatalf("GetConnectionInfo() error = %v, want %v", err, ErrAPI)
}
withTelemostAPIServer(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
_, _ = w.Write([]byte("{"))
}))
if _, err := GetConnectionInfo(context.Background(), "room", "peer"); err == nil {
t.Fatal("GetConnectionInfo() unexpectedly accepted bad json")
}
}
func TestTelemostNewPeerUsesConnectionInfo(t *testing.T) {
withTelemostAPIServer(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
_ = json.NewEncoder(w).Encode(ConnectionInfo{
RoomID: "room",
PeerID: "peer-id",
Credentials: "creds",
})
}))
p, err := NewPeer(context.Background(), "room", "name", nil)
if err != nil {
t.Fatalf("NewPeer() error = %v", err)
}
if p.roomURL != "room" || p.name != "name" || p.conn.PeerID != "peer-id" || p.sendQueue == nil {
t.Fatalf("NewPeer() = %+v", p)
}
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,196 +0,0 @@
package telemost
import (
"testing"
"time"
"github.com/pion/webrtc/v4"
)
func TestCloseSignal(t *testing.T) {
closeSignal(nil)
ch := make(chan struct{})
closeSignal(ch)
select {
case <-ch:
default:
t.Fatal("closeSignal() did not close channel")
}
closeSignal(ch)
}
func TestTrafficShapeAndDelay(t *testing.T) {
p := &Peer{}
p.SetTrafficShape(TrafficShape{MaxMessageSize: -1, MinDelay: 5 * time.Millisecond, MaxDelay: 2 * time.Millisecond})
if p.trafficShape.MaxMessageSize != realDataChannelMessageLimit {
t.Fatalf("MaxMessageSize = %d, want default", p.trafficShape.MaxMessageSize)
}
if p.trafficShape.MaxDelay != p.trafficShape.MinDelay {
t.Fatalf("MaxDelay = %v, want %v", p.trafficShape.MaxDelay, p.trafficShape.MinDelay)
}
if got := p.calculateDelay(); got != 5*time.Millisecond {
t.Fatalf("calculateDelay() = %v, want 5ms", got)
}
p.SetTrafficShape(TrafficShape{MaxMessageSize: 10, MinDelay: time.Millisecond, MaxDelay: 4 * time.Millisecond})
for range 20 {
got := p.calculateDelay()
if got < time.Millisecond || got >= 4*time.Millisecond {
t.Fatalf("calculateDelay() = %v, out of range", got)
}
}
}
func TestICEParsingFiltersTURN(t *testing.T) {
if isNonTURNURL("") || isNonTURNURL("turn:host") || isNonTURNURL("turns:host") {
t.Fatal("isNonTURNURL accepted empty or TURN URL")
}
if !isNonTURNURL("stun:host") {
t.Fatal("isNonTURNURL rejected STUN URL")
}
urls := parseICEURLs(map[string]interface{}{"urls": []interface{}{"turn:x", "stun:a", 123, "turns:y"}}) //nolint:goconst,lll // test literal, repetition is intentional
if len(urls) != 1 || urls[0] != "stun:a" {
t.Fatalf("parseICEURLs(interface) = %v, want [stun:a]", urls)
}
urls = parseICEURLs(map[string]interface{}{"urls": []string{"stun:a", "turn:b"}})
if len(urls) != 1 || urls[0] != "stun:a" {
t.Fatalf("parseICEURLs(strings) = %v, want [stun:a]", urls)
}
}
func TestParseICEServer(t *testing.T) {
if _, ok := parseICEServer("bad"); ok {
t.Fatal("parseICEServer() accepted non-map")
}
if _, ok := parseICEServer(map[string]interface{}{"urls": []interface{}{"turn:x"}}); ok {
t.Fatal("parseICEServer() accepted TURN-only server")
}
ice, ok := parseICEServer(map[string]interface{}{
"urls": []interface{}{"stun:a", "turn:b"},
"username": "user",
"credential": "pass",
})
if !ok {
t.Fatal("parseICEServer() ok = false")
}
if len(ice.URLs) != 1 || ice.URLs[0] != "stun:a" || ice.Username != "user" || ice.Credential != "pass" {
t.Fatalf("parseICEServer() = %+v", ice)
}
}
func TestConferenceEndParsing(t *testing.T) {
for _, msg := range []map[string]interface{}{
{"conferenceClosed": true},
{"conference": map[string]interface{}{"state": "ENDED"}}, //nolint:goconst // test literal, repetition is intentional
{"conferenceState": map[string]interface{}{"state": "terminated"}},
} {
if !isConferenceEndMessage(msg) {
t.Fatalf("isConferenceEndMessage(%v) = false", msg)
}
}
if isConferenceEndMessage(map[string]interface{}{"conference": map[string]interface{}{"state": "open"}}) {
t.Fatal("isConferenceEndMessage() accepted active conference")
}
for _, state := range []string{"closed", "ended", "finished", "terminated"} {
if !isEndedState(state) {
t.Fatalf("isEndedState(%q) = false", state)
}
}
if isEndedState("active") {
t.Fatal("isEndedState(active) = true")
}
}
//nolint:cyclop // table-driven test naturally has many branches
func TestPeerSmallStateHelpers(t *testing.T) {
p := &Peer{
reconnectCh: make(chan struct{}, 1),
closeCh: make(chan struct{}),
sendQueue: make(chan []byte, 2),
ackWaiters: make(map[string]chan struct{}),
}
p.SetEndedCallback(func(string) {})
if p.onEnded == nil {
t.Fatal("SetEndedCallback() did not store callback")
}
p.SetReconnectCallback(func(*webrtc.DataChannel) {})
if p.onReconnect == nil {
t.Fatal("SetReconnectCallback() did not store callback")
}
p.SetShouldReconnect(func() bool { return true })
if p.shouldReconnect == nil || !p.shouldReconnect() {
t.Fatal("SetShouldReconnect() did not store callback")
}
p.subscriberReady.Store(true)
if !p.CanSend() {
t.Fatal("CanSend() = false for subscriber-only ready peer")
}
p.closed.Store(true)
if p.CanSend() {
t.Fatal("CanSend() = true for closed peer")
}
ch := p.registerAckWaiter("uid-1")
p.resolveAck("uid-1")
select {
case <-ch:
default:
t.Fatal("resolveAck() did not close waiter")
}
if p.waitForAck("", make(chan struct{}), time.Millisecond) {
t.Fatal("waitForAck(empty uid) = true")
}
ch = p.registerAckWaiter("uid-2")
go p.resolveAck("uid-2")
if !p.waitForAck("uid-2", ch, time.Second) {
t.Fatal("waitForAck() = false after resolveAck")
}
if err := p.AddVideoTrack(nil); err != nil {
t.Fatalf("AddVideoTrack(nil) error = %v", err)
}
if !p.hasLocalVideoTracks() {
t.Fatal("hasLocalVideoTracks() = false after AddVideoTrack")
}
p.SetVideoTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver) {})
if p.videoTrackHandler() == nil {
t.Fatal("videoTrackHandler() = nil")
}
}
func TestTelemetryCfgParsing(t *testing.T) {
if _, _, ok := parseTelemetryCfg(map[string]interface{}{}); ok {
t.Fatal("parseTelemetryCfg() accepted missing config")
}
if _, _, ok := parseTelemetryCfg(map[string]interface{}{
"telemetryConfiguration": map[string]interface{}{}, //nolint:goconst // test literal, repetition is intentional
}); ok {
t.Fatal("parseTelemetryCfg() accepted missing endpoint")
}
endpoint, interval, ok := parseTelemetryCfg(map[string]interface{}{
"telemetryConfiguration": map[string]interface{}{
"endpoint": "https://example.test/log",
"sendingInterval": float64(250),
},
})
if !ok || endpoint != "https://example.test/log" || interval != 250*time.Millisecond {
t.Fatalf("parseTelemetryCfg() = (%q, %v, %v)", endpoint, interval, ok)
}
endpoint, interval, ok = parseTelemetryCfg(map[string]interface{}{
"telemetryConfiguration": map[string]interface{}{
"url": "https://example.test/url",
},
})
if !ok || endpoint != "https://example.test/url" || interval != defaultTelemetryInterval {
t.Fatalf("parseTelemetryCfg(default) = (%q, %v, %v)", endpoint, interval, ok)
}
}

View File

@@ -1,84 +0,0 @@
// Package telemost implements the Yandex Telemost WebRTC provider.
package telemost
import (
"context"
"fmt"
"github.com/openlibrecommunity/olcrtc/internal/provider"
"github.com/pion/webrtc/v4"
)
type telemostProvider struct {
peer *Peer
}
// New creates a new Telemost provider instance.
func New(ctx context.Context, cfg provider.Config) (provider.Provider, error) {
peer, err := NewPeer(ctx, cfg.RoomURL, cfg.Name, cfg.OnData)
if err != nil {
return nil, fmt.Errorf("create telemost peer: %w", err)
}
return &telemostProvider{peer: peer}, nil
}
// Connect starts the provider connection.
func (t *telemostProvider) Connect(ctx context.Context) error {
return t.peer.Connect(ctx)
}
// Send transmits data to the room.
func (t *telemostProvider) Send(data []byte) error {
return t.peer.Send(data)
}
// Close terminates the provider connection.
func (t *telemostProvider) Close() error {
return t.peer.Close()
}
// SetReconnectCallback sets the function to call on reconnection.
func (t *telemostProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) {
t.peer.SetReconnectCallback(cb)
}
// SetShouldReconnect sets the function to determine if reconnection should occur.
func (t *telemostProvider) SetShouldReconnect(fn func() bool) {
t.peer.SetShouldReconnect(fn)
}
// SetEndedCallback sets the function to call when the session ends.
func (t *telemostProvider) SetEndedCallback(cb func(string)) {
t.peer.SetEndedCallback(cb)
}
// WatchConnection monitors the provider connection state.
func (t *telemostProvider) WatchConnection(ctx context.Context) {
t.peer.WatchConnection(ctx)
}
// CanSend checks if the provider is ready to transmit data.
func (t *telemostProvider) CanSend() bool {
return t.peer.CanSend()
}
// GetSendQueue returns the data transmission queue.
func (t *telemostProvider) GetSendQueue() chan []byte {
return t.peer.GetSendQueue()
}
// GetBufferedAmount returns the current WebRTC buffered amount.
func (t *telemostProvider) GetBufferedAmount() uint64 {
return t.peer.GetBufferedAmount()
}
// AddVideoTrack adds a video track to the telemost connection.
func (t *telemostProvider) AddVideoTrack(track webrtc.TrackLocal) error {
return t.peer.AddVideoTrack(track)
}
// SetVideoTrackHandler registers a callback for subscribed remote video tracks.
func (t *telemostProvider) SetVideoTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) {
t.peer.SetVideoTrackHandler(cb)
}

View File

@@ -1,55 +0,0 @@
package telemost
import (
"context"
"errors"
"testing"
"github.com/pion/webrtc/v4"
)
//nolint:cyclop // table-driven test naturally has many branches
func TestTelemostProviderForwardsPeerMethods(t *testing.T) {
peer := &Peer{
reconnectCh: make(chan struct{}, 1),
closeCh: make(chan struct{}),
sendQueue: make(chan []byte, 1),
ackWaiters: make(map[string]chan struct{}),
}
p := &telemostProvider{peer: peer}
p.SetReconnectCallback(func(*webrtc.DataChannel) {})
p.SetShouldReconnect(func() bool { return true })
p.SetEndedCallback(func(string) {})
p.SetVideoTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver) {})
if peer.onReconnect == nil || peer.shouldReconnect == nil || peer.onEnded == nil || peer.onVideoTrack == nil {
t.Fatal("callbacks were not forwarded")
}
if p.GetSendQueue() != peer.sendQueue {
t.Fatal("GetSendQueue() did not forward")
}
if p.GetBufferedAmount() != 0 {
t.Fatal("GetBufferedAmount() != 0 with nil datachannel")
}
if err := p.AddVideoTrack(nil); err != nil {
t.Fatalf("AddVideoTrack(nil) error = %v", err)
}
if p.CanSend() {
t.Fatal("CanSend() = true for unready peer")
}
done := make(chan struct{})
go func() {
p.WatchConnection(context.Background())
close(done)
}()
if err := p.Close(); err != nil {
t.Fatalf("Close() error = %v", err)
}
<-done
if err := p.Send([]byte("x")); !errors.Is(err, ErrDataChannelNotReady) {
t.Fatalf("Send() error = %v, want datachannel not ready", err)
}
}

View File

@@ -1,85 +0,0 @@
package telemost
import (
"testing"
"time"
)
//nolint:cyclop // table-driven test naturally has many branches
func TestSessionReconnectAndEndedHelpers(t *testing.T) {
p := &Peer{
reconnectCh: make(chan struct{}, 2),
closeCh: make(chan struct{}),
keepAliveCh: make(chan struct{}),
sessionCloseCh: make(chan struct{}),
telemetryCh: make(chan struct{}, 1),
}
keepAliveCh, sessionCloseCh := p.resetSession()
if keepAliveCh == nil || sessionCloseCh == nil || keepAliveCh != p.keepAliveCh || sessionCloseCh != p.sessionCloseCh {
t.Fatal("resetSession() did not replace session channels")
}
p.subscriberReady.Store(true)
p.publisherReady.Store(true)
p.resetMediaState()
if p.subscriberReady.Load() || p.publisherReady.Load() || p.subscriberConn == nil || p.publisherConn == nil {
t.Fatal("resetMediaState() did not reset readiness")
}
p.queueReconnect()
select {
case <-p.reconnectCh:
default:
t.Fatal("queueReconnect() did not enqueue")
}
p.SetShouldReconnect(func() bool { return false })
p.queueReconnect()
select {
case <-p.reconnectCh:
t.Fatal("queueReconnect() enqueued despite policy=false")
default:
}
p.reconnectCh <- struct{}{}
p.reconnectCh <- struct{}{}
p.drainReconnectQueue()
select {
case <-p.reconnectCh:
t.Fatal("drainReconnectQueue() left queued item")
default:
}
p.telemetryActive.Store(true)
p.stopTelemetry()
select {
case <-p.telemetryCh:
default:
t.Fatal("stopTelemetry() did not signal active telemetry")
}
ended := ""
p.SetEndedCallback(func(reason string) { ended = reason })
p.signalEnded("done")
if !p.closed.Load() || ended != "done" {
t.Fatalf("signalEnded() closed=%v reason=%q", p.closed.Load(), ended)
}
}
func TestWaitForAckTimeoutAndClose(t *testing.T) {
p := &Peer{
closeCh: make(chan struct{}),
ackWaiters: make(map[string]chan struct{}),
}
ch := p.registerAckWaiter("timeout")
if p.waitForAck("timeout", ch, time.Millisecond) {
t.Fatal("waitForAck(timeout) = true")
}
ch = p.registerAckWaiter("closed")
close(p.closeCh)
if p.waitForAck("closed", ch, time.Second) {
t.Fatal("waitForAck(closeCh) = true")
}
}