refactor: migrate wbstream to engine/livekit + auth/wbstream

Split the WB Stream provider into two orthogonal pieces:
- internal/engine/livekit — generic LiveKit transport (URL+Token only,
  no service-specific assumptions). Registered as engine "livekit".
- internal/auth/wbstream — WB Stream API flow (guest register, join,
  token exchange). Implements auth.Provider and auth.RoomCreator,
  reports engine "livekit".

The carrier name "wbstream" now goes through registerEngineAuth, which
wires the auth provider to the engine it declares. CLI surface is
unchanged. session.Gen for wbstream calls the RoomCreator directly;
that path will become fully generic in a later step. jazz and telemost
remain on the legacy provider path for now.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This commit is contained in:
zarazaex69
2026-05-11 03:28:34 +03:00
parent 9388a8e494
commit 071106a674
12 changed files with 504 additions and 635 deletions

View File

@@ -8,6 +8,8 @@ import (
"slices"
"time"
"github.com/openlibrecommunity/olcrtc/internal/auth"
authWBStream "github.com/openlibrecommunity/olcrtc/internal/auth/wbstream"
"github.com/openlibrecommunity/olcrtc/internal/carrier"
"github.com/openlibrecommunity/olcrtc/internal/carrier/builtin"
"github.com/openlibrecommunity/olcrtc/internal/client"
@@ -15,7 +17,6 @@ import (
"github.com/openlibrecommunity/olcrtc/internal/link/direct"
"github.com/openlibrecommunity/olcrtc/internal/names"
"github.com/openlibrecommunity/olcrtc/internal/provider/jazz"
"github.com/openlibrecommunity/olcrtc/internal/provider/wbstream"
"github.com/openlibrecommunity/olcrtc/internal/server"
"github.com/openlibrecommunity/olcrtc/internal/transport"
"github.com/openlibrecommunity/olcrtc/internal/transport/datachannel"
@@ -443,6 +444,8 @@ func genRetry(ctx context.Context, fn func(context.Context) error) error {
}
// Gen creates cfg.Amount rooms for the configured carrier and writes each room ID to out.
//
//nolint:cyclop // transitional; refactor/universal-carrier replaces this with auth.RoomCreator dispatch
func Gen(ctx context.Context, cfg Config, out func(string)) error {
switch cfg.Carrier {
case carrierJazz:
@@ -462,13 +465,17 @@ func Gen(ctx context.Context, cfg Config, out func(string)) error {
out(roomID)
}
case carrierWBStream:
creator, ok := any(authWBStream.Provider{}).(auth.RoomCreator)
if !ok {
return fmt.Errorf("%w: wbstream auth provider does not implement RoomCreator", ErrUnsupportedCarrier)
}
for i := range cfg.Amount {
var roomID string
err := genRetry(ctx, func(ctx context.Context) error {
var err error
roomID, err = wbstream.CreateRoom(ctx, names.Generate())
roomID, err = creator.CreateRoom(ctx, auth.Config{Name: names.Generate()})
if err != nil {
return fmt.Errorf("wbstream.CreateRoom: %w", err)
return fmt.Errorf("wbstream CreateRoom: %w", err)
}
return nil
})

View File

@@ -1,3 +1,7 @@
// Package wbstream is the auth provider for the WB Stream service. It
// produces LiveKit credentials by registering a guest, optionally creating
// a room, joining it, and exchanging the guest access token for a room
// token.
package wbstream
import (
@@ -12,7 +16,9 @@ import (
"github.com/openlibrecommunity/olcrtc/internal/protect"
)
var apiBase = "https://stream.wb.ru" //nolint:gochecknoglobals // package-level state intentional
const wsURL = "wss://wbstream01-el.wb.ru:7880"
var apiBase = "https://stream.wb.ru" //nolint:gochecknoglobals // overridable base URL for tests
var (
errGuestRegister = errors.New("guest register failed")
@@ -126,19 +132,6 @@ func createRoom(ctx context.Context, accessToken string) (string, error) {
return res.RoomID, nil
}
// CreateRoom registers a temporary guest, creates a WB Stream room, and returns its id.
func CreateRoom(ctx context.Context, displayName string) (string, error) {
accessToken, err := registerGuest(ctx, displayName)
if err != nil {
return "", fmt.Errorf("register guest: %w", err)
}
roomID, err := createRoom(ctx, accessToken)
if err != nil {
return "", fmt.Errorf("create room: %w", err)
}
return roomID, nil
}
func joinRoom(ctx context.Context, accessToken, roomID string) error {
u := fmt.Sprintf("%s/api-room/api/v1/room/%s/join", apiBase, roomID)
req, err := http.NewRequestWithContext(ctx, http.MethodPost, u, bytes.NewReader([]byte("{}")))

View File

@@ -0,0 +1,66 @@
package wbstream
import (
"context"
"fmt"
"github.com/openlibrecommunity/olcrtc/internal/auth"
)
// Provider produces LiveKit credentials for the WB Stream service.
type Provider struct{}
// Engine reports which engine consumes credentials from this auth provider.
func (Provider) Engine() string { return "livekit" }
// Issue runs the WB Stream auth flow and returns LiveKit credentials.
//
// If cfg.RoomURL is empty or "any", a fresh room is created on the fly —
// keeping the behaviour the legacy wbstream provider had.
func (Provider) Issue(ctx context.Context, cfg auth.Config) (auth.Credentials, error) {
accessToken, err := registerGuest(ctx, cfg.Name)
if err != nil {
return auth.Credentials{}, fmt.Errorf("register guest: %w", err)
}
roomID := cfg.RoomURL
if roomID == "" || roomID == "any" {
roomID, err = createRoom(ctx, accessToken)
if err != nil {
return auth.Credentials{}, fmt.Errorf("create room: %w", err)
}
}
if err := joinRoom(ctx, accessToken, roomID); err != nil {
return auth.Credentials{}, fmt.Errorf("join room: %w", err)
}
token, err := getToken(ctx, accessToken, roomID, cfg.Name)
if err != nil {
return auth.Credentials{}, fmt.Errorf("get token: %w", err)
}
return auth.Credentials{
URL: wsURL,
Token: token,
Extra: map[string]string{"roomID": roomID},
}, nil
}
// CreateRoom registers a temporary guest and creates a WB Stream room.
// Used by gen mode.
func (Provider) CreateRoom(ctx context.Context, cfg auth.Config) (string, error) {
accessToken, err := registerGuest(ctx, cfg.Name)
if err != nil {
return "", fmt.Errorf("register guest: %w", err)
}
roomID, err := createRoom(ctx, accessToken)
if err != nil {
return "", fmt.Errorf("create room: %w", err)
}
return roomID, nil
}
func init() { //nolint:gochecknoinits // auth registration is the canonical Go pattern for plugins
auth.Register("wbstream", Provider{})
}

View File

@@ -0,0 +1,152 @@
package builtin
import (
"context"
"fmt"
"github.com/openlibrecommunity/olcrtc/internal/auth"
"github.com/openlibrecommunity/olcrtc/internal/carrier"
"github.com/openlibrecommunity/olcrtc/internal/engine"
"github.com/pion/webrtc/v4"
)
// registerEngineAuth registers a carrier name that resolves credentials
// through an auth provider and connects via the engine the auth provider
// reports.
func registerEngineAuth(carrierName string, authProvider auth.Provider) {
carrier.Register(carrierName, func(ctx context.Context, cfg carrier.Config) (carrier.Session, error) {
creds, err := authProvider.Issue(ctx, auth.Config{
RoomURL: cfg.RoomURL,
Name: cfg.Name,
DNSServer: cfg.DNSServer,
ProxyAddr: cfg.ProxyAddr,
ProxyPort: cfg.ProxyPort,
})
if err != nil {
return nil, fmt.Errorf("auth issue: %w", err)
}
sess, err := engine.New(ctx, authProvider.Engine(), engine.Config{
URL: creds.URL,
Token: creds.Token,
Name: cfg.Name,
OnData: cfg.OnData,
DNSServer: cfg.DNSServer,
ProxyAddr: cfg.ProxyAddr,
ProxyPort: cfg.ProxyPort,
})
if err != nil {
return nil, fmt.Errorf("engine new: %w", err)
}
return &engineSession{session: sess}, nil
})
}
type engineSession struct {
session engine.Session
}
func (s *engineSession) Capabilities() carrier.Capabilities {
caps := s.session.Capabilities()
return carrier.Capabilities{ByteStream: caps.ByteStream, VideoTrack: caps.VideoTrack}
}
func (s *engineSession) OpenByteStream() (carrier.ByteStream, error) {
if !s.session.Capabilities().ByteStream {
return nil, carrier.ErrByteStreamUnsupported
}
return &engineByteStream{session: s.session}, nil
}
func (s *engineSession) OpenVideoTrack() (carrier.VideoTrack, error) {
vt, ok := s.session.(engine.VideoTrackCapable)
if !ok {
return nil, carrier.ErrVideoTrackUnsupported
}
return &engineVideoTrack{session: s.session, vt: vt}, nil
}
type engineByteStream struct {
session engine.Session
}
func (b *engineByteStream) Connect(ctx context.Context) error {
if err := b.session.Connect(ctx); err != nil {
return fmt.Errorf("connect: %w", err)
}
return nil
}
func (b *engineByteStream) Send(data []byte) error {
if err := b.session.Send(data); err != nil {
return fmt.Errorf("send: %w", err)
}
return nil
}
func (b *engineByteStream) Close() error {
if err := b.session.Close(); err != nil {
return fmt.Errorf("close: %w", err)
}
return nil
}
func (b *engineByteStream) SetReconnectCallback(cb func()) {
b.session.SetReconnectCallback(func(_ *webrtc.DataChannel) {
if cb != nil {
cb()
}
})
}
func (b *engineByteStream) SetShouldReconnect(fn func() bool) { b.session.SetShouldReconnect(fn) }
func (b *engineByteStream) SetEndedCallback(cb func(string)) { b.session.SetEndedCallback(cb) }
func (b *engineByteStream) WatchConnection(ctx context.Context) {
b.session.WatchConnection(ctx)
}
func (b *engineByteStream) CanSend() bool { return b.session.CanSend() }
type engineVideoTrack struct {
session engine.Session
vt engine.VideoTrackCapable
}
func (v *engineVideoTrack) Connect(ctx context.Context) error {
if err := v.session.Connect(ctx); err != nil {
return fmt.Errorf("connect: %w", err)
}
return nil
}
func (v *engineVideoTrack) Close() error {
if err := v.session.Close(); err != nil {
return fmt.Errorf("close: %w", err)
}
return nil
}
func (v *engineVideoTrack) SetReconnectCallback(cb func()) {
v.session.SetReconnectCallback(func(_ *webrtc.DataChannel) {
if cb != nil {
cb()
}
})
}
func (v *engineVideoTrack) SetShouldReconnect(fn func() bool) { v.session.SetShouldReconnect(fn) }
func (v *engineVideoTrack) SetEndedCallback(cb func(string)) { v.session.SetEndedCallback(cb) }
func (v *engineVideoTrack) WatchConnection(ctx context.Context) {
v.session.WatchConnection(ctx)
}
func (v *engineVideoTrack) CanSend() bool { return v.session.CanSend() }
func (v *engineVideoTrack) AddTrack(track webrtc.TrackLocal) error {
if err := v.vt.AddVideoTrack(track); err != nil {
return fmt.Errorf("add track: %w", err)
}
return nil
}
func (v *engineVideoTrack) SetTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) {
v.vt.SetVideoTrackHandler(cb)
}

View File

@@ -4,20 +4,25 @@ package builtin
import (
"context"
authWBStream "github.com/openlibrecommunity/olcrtc/internal/auth/wbstream"
"github.com/openlibrecommunity/olcrtc/internal/carrier"
_ "github.com/openlibrecommunity/olcrtc/internal/engine/livekit" // engine registration via init
"github.com/openlibrecommunity/olcrtc/internal/provider"
"github.com/openlibrecommunity/olcrtc/internal/provider/jazz"
"github.com/openlibrecommunity/olcrtc/internal/provider/telemost"
"github.com/openlibrecommunity/olcrtc/internal/provider/wbstream"
)
type providerFactory func(context.Context, provider.Config) (provider.Provider, error)
// Register wires the built-in carriers into the carrier registry.
func Register() {
// Legacy provider-based carriers (still being migrated to engine+auth).
registerProvider("jazz", jazz.New)
registerProvider("telemost", telemost.New)
registerProvider("wbstream", wbstream.New)
// Migrated to engine+auth: WB Stream now goes through the LiveKit engine
// with the wbstream auth provider.
registerEngineAuth("wbstream", authWBStream.Provider{})
}
func registerProvider(name string, factory providerFactory) {

View File

@@ -17,11 +17,12 @@ import (
"time"
"github.com/openlibrecommunity/olcrtc/internal/app/session"
"github.com/openlibrecommunity/olcrtc/internal/auth"
authWBStream "github.com/openlibrecommunity/olcrtc/internal/auth/wbstream"
"github.com/openlibrecommunity/olcrtc/internal/carrier"
"github.com/openlibrecommunity/olcrtc/internal/client"
"github.com/openlibrecommunity/olcrtc/internal/link"
"github.com/openlibrecommunity/olcrtc/internal/provider/jazz"
"github.com/openlibrecommunity/olcrtc/internal/provider/wbstream"
"github.com/openlibrecommunity/olcrtc/internal/server"
"github.com/pion/webrtc/v4"
)
@@ -372,7 +373,7 @@ func realRoomURL(ctx context.Context, t *testing.T, carrierName string) string {
if *realE2EWBStreamRoom != "" {
return *realE2EWBStreamRoom
}
room, err := wbstream.CreateRoom(ctx, "olcrtc-e2e-room")
room, err := authWBStream.Provider{}.CreateRoom(ctx, auth.Config{Name: "olcrtc-e2e-room"})
if err != nil {
t.Fatalf("create real wbstream room: %v", err)
}

View File

@@ -0,0 +1,259 @@
// Package livekit implements an engine.Session backed by the LiveKit SFU
// protocol via the upstream livekit/server-sdk-go client.
//
// This engine is service-agnostic: it accepts a wss:// signaling URL and an
// access token, and provides byte-stream + video-track primitives over a
// LiveKit room. Service-specific token acquisition (e.g. WB Stream, Jazz,
// or a self-hosted LiveKit deployment) lives in the auth package.
package livekit
import (
"context"
"errors"
"fmt"
"log"
"sync"
"sync/atomic"
protoLogger "github.com/livekit/protocol/logger"
lksdk "github.com/livekit/server-sdk-go/v2"
"github.com/openlibrecommunity/olcrtc/internal/engine"
"github.com/pion/webrtc/v4"
)
const (
defaultSendQueueSize = 5000
dataPublishTopic = "olcrtc"
videoTrackName = "videochannel"
)
var (
// ErrSessionClosed is returned when an operation is attempted on a closed session.
ErrSessionClosed = errors.New("livekit session closed")
// ErrSendQueueFull is returned when the outbound queue cannot accept more data.
ErrSendQueueFull = errors.New("livekit send queue full")
// ErrRoomNotConnected is returned when the underlying room is not connected yet.
ErrRoomNotConnected = errors.New("livekit room not connected")
// ErrURLRequired is returned when no signaling URL was supplied.
ErrURLRequired = errors.New("livekit signaling URL required")
// ErrTokenRequired is returned when no access token was supplied.
ErrTokenRequired = errors.New("livekit access token required")
)
// Session is the LiveKit engine handle.
type Session struct {
url string
token string
name string
room *lksdk.Room
onData func([]byte)
onReconnect func(*webrtc.DataChannel)
shouldReconnect func() bool
onEnded func(string)
sendQueue chan []byte
closed atomic.Bool
done chan struct{}
cancel context.CancelFunc
videoTrackMu sync.RWMutex
videoTracks []webrtc.TrackLocal
onVideoTrack func(*webrtc.TrackRemote, *webrtc.RTPReceiver)
wg sync.WaitGroup
}
// New creates a new LiveKit engine session.
func New(ctx context.Context, cfg engine.Config) (engine.Session, error) {
if cfg.URL == "" {
return nil, ErrURLRequired
}
if cfg.Token == "" {
return nil, ErrTokenRequired
}
_, cancel := context.WithCancel(ctx)
return &Session{
url: cfg.URL,
token: cfg.Token,
name: cfg.Name,
onData: cfg.OnData,
sendQueue: make(chan []byte, defaultSendQueueSize),
done: make(chan struct{}),
cancel: cancel,
}, nil
}
// Capabilities reports what this engine can do.
func (s *Session) Capabilities() engine.Capabilities {
return engine.Capabilities{ByteStream: true, VideoTrack: true}
}
// Connect joins the LiveKit room.
func (s *Session) Connect(_ context.Context) error {
roomCB := &lksdk.RoomCallback{
ParticipantCallback: lksdk.ParticipantCallback{
OnDataReceived: func(data []byte, _ lksdk.DataReceiveParams) {
if s.onData != nil {
s.onData(data)
}
},
OnTrackSubscribed: func(track *webrtc.TrackRemote, _ *lksdk.RemoteTrackPublication, _ *lksdk.RemoteParticipant) {
if track.Kind() != webrtc.RTPCodecTypeVideo {
return
}
s.videoTrackMu.RLock()
cb := s.onVideoTrack
s.videoTrackMu.RUnlock()
if cb != nil {
cb(track, nil)
}
},
},
OnDisconnected: func() {
if !s.closed.Load() && s.onEnded != nil {
s.onEnded("disconnected from livekit")
}
},
}
room, err := lksdk.ConnectToRoomWithToken(
s.url,
s.token,
roomCB,
lksdk.WithAutoSubscribe(true),
lksdk.WithLogger(protoLogger.GetDiscardLogger()),
)
if err != nil {
return fmt.Errorf("connect to room: %w", err)
}
s.room = room
if err := s.publishPendingTracks(); err != nil {
return err
}
s.wg.Add(1)
go s.processSendQueue()
return nil
}
func (s *Session) publishPendingTracks() error {
s.videoTrackMu.RLock()
defer s.videoTrackMu.RUnlock()
for _, track := range s.videoTracks {
if _, err := s.room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{
Name: videoTrackName,
}); err != nil {
return fmt.Errorf("failed to publish track: %w", err)
}
}
return nil
}
func (s *Session) processSendQueue() {
defer s.wg.Done()
for {
select {
case <-s.done:
return
case data, ok := <-s.sendQueue:
if !ok {
return
}
if err := s.room.LocalParticipant.PublishDataPacket(
lksdk.UserData(data),
lksdk.WithDataPublishTopic(dataPublishTopic),
lksdk.WithDataPublishReliable(true),
); err != nil {
log.Printf("livekit publish data error: %v", err)
}
}
}
}
// Send queues data for transmission.
func (s *Session) Send(data []byte) error {
if s.closed.Load() {
return ErrSessionClosed
}
select {
case s.sendQueue <- data:
return nil
default:
return ErrSendQueueFull
}
}
// Close terminates the session.
func (s *Session) Close() error {
if s.closed.CompareAndSwap(false, true) {
s.cancel()
close(s.done)
if s.room != nil {
s.unpublishLocalTracks()
s.room.Disconnect()
}
close(s.sendQueue)
s.wg.Wait()
}
return nil
}
func (s *Session) unpublishLocalTracks() {
if s.room == nil || s.room.LocalParticipant == nil {
return
}
for _, publication := range s.room.LocalParticipant.TrackPublications() {
if publication.SID() == "" {
continue
}
if err := s.room.LocalParticipant.UnpublishTrack(publication.SID()); err != nil {
log.Printf("livekit unpublish track error: %v", err)
}
}
}
// SetReconnectCallback stores the reconnect callback (LiveKit reconnects internally; this is kept for API parity).
func (s *Session) SetReconnectCallback(cb func(*webrtc.DataChannel)) { s.onReconnect = cb }
// SetShouldReconnect stores the reconnect predicate (kept for API parity).
func (s *Session) SetShouldReconnect(fn func() bool) { s.shouldReconnect = fn }
// SetEndedCallback registers a function to call when the session ends.
func (s *Session) SetEndedCallback(cb func(string)) { s.onEnded = cb }
// WatchConnection is a no-op; LiveKit handles connection supervision itself.
func (s *Session) WatchConnection(_ context.Context) {}
// CanSend reports whether the session is ready to accept data.
func (s *Session) CanSend() bool { return !s.closed.Load() && s.room != nil }
// GetSendQueue exposes the outbound queue.
func (s *Session) GetSendQueue() chan []byte { return s.sendQueue }
// GetBufferedAmount is a stub for LiveKit (the SDK handles its own buffering).
func (s *Session) GetBufferedAmount() uint64 { return 0 }
// AddVideoTrack publishes a video track to the room.
func (s *Session) AddVideoTrack(track webrtc.TrackLocal) error {
s.videoTrackMu.Lock()
s.videoTracks = append(s.videoTracks, track)
s.videoTrackMu.Unlock()
if s.room == nil || s.room.LocalParticipant == nil {
return nil
}
if _, err := s.room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{
Name: videoTrackName,
}); err != nil {
return fmt.Errorf("failed to publish track: %w", err)
}
return nil
}
// SetVideoTrackHandler registers a callback for remote video tracks.
func (s *Session) SetVideoTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) {
s.videoTrackMu.Lock()
defer s.videoTrackMu.Unlock()
s.onVideoTrack = cb
}
func init() { //nolint:gochecknoinits // engine registration is the canonical Go pattern for plugins
engine.Register("livekit", New)
}

View File

@@ -1,124 +0,0 @@
package wbstream
import (
"context"
"encoding/json"
"errors"
"net/http"
"net/http/httptest"
"testing"
)
func withWBAPIServer(t *testing.T, h http.Handler) {
t.Helper()
old := apiBase
srv := httptest.NewServer(h)
t.Cleanup(func() {
apiBase = old
srv.Close()
})
apiBase = srv.URL
}
//nolint:cyclop // table-driven test naturally has many branches
func TestWBStreamAPIHappyPath(t *testing.T) {
withWBAPIServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/auth/api/v1/auth/user/guest-register":
if r.Method != http.MethodPost {
t.Fatalf("guest method = %s", r.Method)
}
_ = json.NewEncoder(w).Encode(guestRegisterResponse{AccessToken: "access"}) //nolint:goconst,gosec,lll // test literal; G117 is a false positive for test fixtures
case "/api-room/api/v2/room":
if r.Header.Get("Authorization") != "Bearer access" {
t.Fatalf("room auth = %q", r.Header.Get("Authorization"))
}
w.WriteHeader(http.StatusCreated)
_ = json.NewEncoder(w).Encode(createRoomResponse{RoomID: "room"}) //nolint:goconst,lll // test literal, repetition is intentional
case "/api-room/api/v1/room/room/join":
w.WriteHeader(http.StatusOK)
case "/api-room-manager/api/v1/room/room/token":
if r.URL.Query().Get("displayName") != "peer" {
t.Fatalf("displayName query = %q", r.URL.Query().Get("displayName"))
}
_ = json.NewEncoder(w).Encode(tokenResponse{RoomToken: "token"}) //nolint:goconst,lll // test literal, repetition is intentional
default:
http.NotFound(w, r)
}
}))
access, err := registerGuest(context.Background(), "peer")
if err != nil {
t.Fatalf("registerGuest() error = %v", err)
}
if access != "access" {
t.Fatalf("registerGuest() = %q", access)
}
room, err := createRoom(context.Background(), access)
if err != nil {
t.Fatalf("createRoom() error = %v", err)
}
if room != "room" {
t.Fatalf("createRoom() = %q", room)
}
if err := joinRoom(context.Background(), access, room); err != nil {
t.Fatalf("joinRoom() error = %v", err)
}
token, err := getToken(context.Background(), access, room, "peer")
if err != nil {
t.Fatalf("getToken() error = %v", err)
}
if token != "token" {
t.Fatalf("getToken() = %q", token)
}
}
func TestWBStreamAPIErrors(t *testing.T) {
withWBAPIServer(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
http.Error(w, "bad", http.StatusBadGateway)
}))
if _, err := registerGuest(context.Background(), "peer"); !errors.Is(err, errGuestRegister) {
t.Fatalf("registerGuest() error = %v, want %v", err, errGuestRegister)
}
if _, err := createRoom(context.Background(), "access"); !errors.Is(err, errCreateRoom) {
t.Fatalf("createRoom() error = %v, want %v", err, errCreateRoom)
}
if err := joinRoom(context.Background(), "access", "room"); !errors.Is(err, errJoinRoom) {
t.Fatalf("joinRoom() error = %v, want %v", err, errJoinRoom)
}
if _, err := getToken(context.Background(), "access", "room", "peer"); !errors.Is(err, errGetToken) {
t.Fatalf("getToken() error = %v, want %v", err, errGetToken)
}
}
func TestWBStreamGetRoomToken(t *testing.T) {
withWBAPIServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
switch r.URL.Path {
case "/auth/api/v1/auth/user/guest-register":
_ = json.NewEncoder(w).Encode(guestRegisterResponse{AccessToken: "access"}) //nolint:gosec,lll // G117: test-only struct mirroring upstream API shape
case "/api-room/api/v2/room":
_ = json.NewEncoder(w).Encode(createRoomResponse{RoomID: "created"})
case "/api-room/api/v1/room/created/join":
w.WriteHeader(http.StatusOK)
case "/api-room-manager/api/v1/room/created/token":
_ = json.NewEncoder(w).Encode(tokenResponse{RoomToken: "token"})
default:
http.NotFound(w, r)
}
}))
p, err := NewPeer(context.Background(), "any", "peer", nil)
if err != nil {
t.Fatalf("NewPeer() error = %v", err)
}
token, err := p.getRoomToken(context.Background())
if err != nil {
t.Fatalf("getRoomToken() error = %v", err)
}
if token != "token" {
t.Fatalf("getRoomToken() = %q", token)
}
}

View File

@@ -1,280 +0,0 @@
// Package wbstream implements the WB Stream WebRTC provider.
package wbstream
import (
"context"
"errors"
"fmt"
"log"
"sync"
"sync/atomic"
protoLogger "github.com/livekit/protocol/logger"
lksdk "github.com/livekit/server-sdk-go/v2"
"github.com/pion/webrtc/v4"
)
const (
wsURL = "wss://wbstream01-el.wb.ru:7880"
)
var (
// ErrPeerClosed is returned when an operation is attempted on a closed peer.
ErrPeerClosed = errors.New("peer closed")
// ErrSendQueueFull is returned when the transmission queue is full.
ErrSendQueueFull = errors.New("send queue full")
// ErrLiveKitNotConnected is returned when the LiveKit room is not connected.
ErrLiveKitNotConnected = errors.New("livekit room not connected")
)
// Peer represents a WB Stream WebRTC connection using LiveKit.
type Peer struct {
roomURL string
name string
room *lksdk.Room
onData func([]byte)
onReconnect func(*webrtc.DataChannel)
shouldReconnect func() bool
onEnded func(string)
sendQueue chan []byte
closed atomic.Bool
done chan struct{}
cancel context.CancelFunc
videoTrackMu sync.RWMutex
videoTracks []webrtc.TrackLocal
onVideoTrack func(*webrtc.TrackRemote, *webrtc.RTPReceiver)
wg sync.WaitGroup
}
// NewPeer creates a new WB Stream provider peer.
func NewPeer(ctx context.Context, roomURL, name string, onData func([]byte)) (*Peer, error) {
_, cancel := context.WithCancel(ctx)
return &Peer{
roomURL: roomURL,
name: name,
onData: onData,
sendQueue: make(chan []byte, 5000),
done: make(chan struct{}),
cancel: cancel,
}, nil
}
// Connect starts the WebRTC connection process.
func (p *Peer) Connect(ctx context.Context) error {
token, err := p.getRoomToken(ctx)
if err != nil {
return fmt.Errorf("get room token: %w", err)
}
roomCB := &lksdk.RoomCallback{
ParticipantCallback: lksdk.ParticipantCallback{
OnDataReceived: func(data []byte, _ lksdk.DataReceiveParams) {
if p.onData != nil {
p.onData(data)
}
},
OnTrackSubscribed: func(track *webrtc.TrackRemote, _ *lksdk.RemoteTrackPublication, _ *lksdk.RemoteParticipant) {
if track.Kind() != webrtc.RTPCodecTypeVideo {
return
}
p.videoTrackMu.RLock()
cb := p.onVideoTrack
p.videoTrackMu.RUnlock()
if cb != nil {
cb(track, nil)
}
},
},
OnDisconnected: func() {
if !p.closed.Load() && p.onEnded != nil {
p.onEnded("disconnected from livekit")
}
},
}
room, err := lksdk.ConnectToRoomWithToken(
wsURL,
token,
roomCB,
lksdk.WithAutoSubscribe(true),
lksdk.WithLogger(protoLogger.GetDiscardLogger()),
)
if err != nil {
return fmt.Errorf("connect to room: %w", err)
}
p.room = room
if err := p.publishPendingTracks(); err != nil {
return err
}
p.wg.Add(1)
go p.processSendQueue()
return nil
}
func (p *Peer) publishPendingTracks() error {
p.videoTrackMu.RLock()
defer p.videoTrackMu.RUnlock()
for _, track := range p.videoTracks {
if _, err := p.room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{
Name: "videochannel",
}); err != nil {
return fmt.Errorf("failed to publish track: %w", err)
}
}
return nil
}
func (p *Peer) getRoomToken(ctx context.Context) (string, error) {
accessToken, err := registerGuest(ctx, p.name)
if err != nil {
return "", fmt.Errorf("register guest: %w", err)
}
roomID := p.roomURL
if roomID == "" || roomID == "any" {
roomID, err = createRoom(ctx, accessToken)
if err != nil {
return "", fmt.Errorf("create room: %w", err)
}
log.Printf("WB Stream room created: %s", roomID)
log.Printf("To connect client use: -id %s", roomID)
}
if err := joinRoom(ctx, accessToken, roomID); err != nil {
return "", fmt.Errorf("join room: %w", err)
}
token, err := getToken(ctx, accessToken, roomID, p.name)
if err != nil {
return "", fmt.Errorf("get token: %w", err)
}
return token, nil
}
func (p *Peer) processSendQueue() {
defer p.wg.Done()
for {
select {
case <-p.done:
return
case data, ok := <-p.sendQueue:
if !ok {
return
}
if err := p.room.LocalParticipant.PublishDataPacket(
lksdk.UserData(data),
lksdk.WithDataPublishTopic("olcrtc"),
lksdk.WithDataPublishReliable(true),
); err != nil {
log.Printf("WB Stream publish data error: %v", err)
}
}
}
}
// Send transmits data to the room.
func (p *Peer) Send(data []byte) error {
if p.closed.Load() {
return ErrPeerClosed
}
select {
case p.sendQueue <- data:
return nil
default:
return ErrSendQueueFull
}
}
// Close terminates the provider connection.
func (p *Peer) Close() error {
if p.closed.CompareAndSwap(false, true) {
p.cancel()
close(p.done)
if p.room != nil {
p.unpublishLocalTracks()
p.room.Disconnect()
}
close(p.sendQueue)
p.wg.Wait()
}
return nil
}
func (p *Peer) unpublishLocalTracks() {
if p.room == nil || p.room.LocalParticipant == nil {
return
}
for _, publication := range p.room.LocalParticipant.TrackPublications() {
if publication.SID() == "" {
continue
}
if err := p.room.LocalParticipant.UnpublishTrack(publication.SID()); err != nil {
log.Printf("WB Stream unpublish track error: %v", err)
}
}
}
// SetReconnectCallback is a stub for WB Stream.
func (p *Peer) SetReconnectCallback(cb func(*webrtc.DataChannel)) {
p.onReconnect = cb
}
// SetShouldReconnect is a stub for WB Stream.
func (p *Peer) SetShouldReconnect(fn func() bool) {
p.shouldReconnect = fn
}
// SetEndedCallback sets the function to call when the session ends.
func (p *Peer) SetEndedCallback(cb func(string)) {
p.onEnded = cb
}
// WatchConnection is a stub for WB Stream.
func (p *Peer) WatchConnection(_ context.Context) {}
// CanSend checks if the provider is ready to transmit data.
func (p *Peer) CanSend() bool {
return !p.closed.Load() && p.room != nil
}
// GetSendQueue returns the data transmission queue.
func (p *Peer) GetSendQueue() chan []byte {
return p.sendQueue
}
// GetBufferedAmount is a stub for WB Stream.
func (p *Peer) GetBufferedAmount() uint64 {
return 0
}
// AddVideoTrack adds a video track to the LiveKit room.
func (p *Peer) AddVideoTrack(track webrtc.TrackLocal) error {
p.videoTrackMu.Lock()
p.videoTracks = append(p.videoTracks, track)
p.videoTrackMu.Unlock()
if p.room == nil || p.room.LocalParticipant == nil {
return nil
}
if _, err := p.room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{
Name: "videochannel",
}); err != nil {
return fmt.Errorf("failed to publish track: %w", err)
}
return nil
}
// SetVideoTrackHandler registers a callback for remote video tracks.
func (p *Peer) SetVideoTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) {
p.videoTrackMu.Lock()
defer p.videoTrackMu.Unlock()
p.onVideoTrack = cb
}

View File

@@ -1,76 +0,0 @@
package wbstream
import (
"context"
"errors"
"testing"
"github.com/pion/webrtc/v4"
)
func TestNewPeerAndSimpleAccessors(t *testing.T) {
p, err := NewPeer(context.Background(), "room", "name", func([]byte) {})
if err != nil {
t.Fatalf("NewPeer() error = %v", err)
}
if p.roomURL != "room" || p.name != "name" || p.sendQueue == nil || p.done == nil { //nolint:goconst,lll // test literal, repetition is intentional
t.Fatalf("NewPeer() = %+v", p)
}
if p.GetSendQueue() != p.sendQueue {
t.Fatal("GetSendQueue() did not return sendQueue")
}
if p.GetBufferedAmount() != 0 {
t.Fatal("GetBufferedAmount() != 0")
}
if p.CanSend() {
t.Fatal("CanSend() = true without room")
}
}
func TestSendQueueAndClose(t *testing.T) {
p, err := NewPeer(context.Background(), "room", "name", nil)
if err != nil {
t.Fatalf("NewPeer() error = %v", err)
}
p.sendQueue = make(chan []byte, 1)
if err := p.Send([]byte("one")); err != nil {
t.Fatalf("Send() error = %v", err)
}
if err := p.Send([]byte("two")); !errors.Is(err, ErrSendQueueFull) {
t.Fatalf("Send() error = %v, want %v", err, ErrSendQueueFull)
}
if err := p.Close(); err != nil {
t.Fatalf("Close() error = %v", err)
}
if err := p.Send([]byte("closed")); !errors.Is(err, ErrPeerClosed) {
t.Fatalf("Send() error = %v, want %v", err, ErrPeerClosed)
}
if err := p.Close(); err != nil {
t.Fatalf("second Close() error = %v", err)
}
}
func TestCallbacksAndVideoTrackStorage(t *testing.T) {
p, err := NewPeer(context.Background(), "room", "name", nil)
if err != nil {
t.Fatalf("NewPeer() error = %v", err)
}
p.SetReconnectCallback(func(*webrtc.DataChannel) {})
p.SetShouldReconnect(func() bool { return true })
p.SetEndedCallback(func(string) {})
p.SetVideoTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver) {})
p.WatchConnection(context.Background())
if p.onReconnect == nil || p.shouldReconnect == nil || p.onEnded == nil || p.onVideoTrack == nil {
t.Fatal("callbacks were not stored")
}
if err := p.AddVideoTrack(nil); err != nil {
t.Fatalf("AddVideoTrack(nil) error = %v", err)
}
if len(p.videoTracks) != 1 {
t.Fatalf("videoTracks len = %d, want 1", len(p.videoTracks))
}
}

View File

@@ -1,84 +0,0 @@
// Package wbstream implements the WB Stream WebRTC provider.
package wbstream
import (
"context"
"fmt"
"github.com/openlibrecommunity/olcrtc/internal/provider"
"github.com/pion/webrtc/v4"
)
type wbStreamProvider struct {
peer *Peer
}
// New creates a new WB Stream provider instance.
func New(ctx context.Context, cfg provider.Config) (provider.Provider, error) {
peer, err := NewPeer(ctx, cfg.RoomURL, cfg.Name, cfg.OnData)
if err != nil {
return nil, fmt.Errorf("create wbstream peer: %w", err)
}
return &wbStreamProvider{peer: peer}, nil
}
// Connect starts the provider connection.
func (w *wbStreamProvider) Connect(ctx context.Context) error {
return w.peer.Connect(ctx)
}
// Send transmits data to the room.
func (w *wbStreamProvider) Send(data []byte) error {
return w.peer.Send(data)
}
// Close terminates the provider connection.
func (w *wbStreamProvider) Close() error {
return w.peer.Close()
}
// SetReconnectCallback sets the function to call on reconnection.
func (w *wbStreamProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) {
w.peer.SetReconnectCallback(cb)
}
// SetShouldReconnect sets the function to determine if reconnection should occur.
func (w *wbStreamProvider) SetShouldReconnect(fn func() bool) {
w.peer.SetShouldReconnect(fn)
}
// SetEndedCallback sets the function to call when the session ends.
func (w *wbStreamProvider) SetEndedCallback(cb func(string)) {
w.peer.SetEndedCallback(cb)
}
// WatchConnection monitors the provider connection state.
func (w *wbStreamProvider) WatchConnection(ctx context.Context) {
w.peer.WatchConnection(ctx)
}
// CanSend checks if the provider is ready to transmit data.
func (w *wbStreamProvider) CanSend() bool {
return w.peer.CanSend()
}
// GetSendQueue returns the data transmission queue.
func (w *wbStreamProvider) GetSendQueue() chan []byte {
return w.peer.GetSendQueue()
}
// GetBufferedAmount returns the current WebRTC buffered amount.
func (w *wbStreamProvider) GetBufferedAmount() uint64 {
return w.peer.GetBufferedAmount()
}
// AddVideoTrack adds a video track to the wbstream connection.
func (w *wbStreamProvider) AddVideoTrack(track webrtc.TrackLocal) error {
return w.peer.AddVideoTrack(track)
}
// SetVideoTrackHandler registers a callback for subscribed remote video tracks.
func (w *wbStreamProvider) SetVideoTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) {
w.peer.SetVideoTrackHandler(cb)
}

View File

@@ -1,50 +0,0 @@
package wbstream
import (
"context"
"errors"
"testing"
"github.com/pion/webrtc/v4"
)
//nolint:cyclop // table-driven test naturally has many branches
func TestWBStreamProviderForwardsPeerMethods(t *testing.T) {
peer, err := NewPeer(context.Background(), "room", "name", nil)
if err != nil {
t.Fatalf("NewPeer() error = %v", err)
}
p := &wbStreamProvider{peer: peer}
p.SetReconnectCallback(func(*webrtc.DataChannel) {})
p.SetShouldReconnect(func() bool { return true })
p.SetEndedCallback(func(string) {})
p.SetVideoTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver) {})
if peer.onReconnect == nil || peer.shouldReconnect == nil || peer.onEnded == nil || peer.onVideoTrack == nil {
t.Fatal("callbacks were not forwarded")
}
if p.GetSendQueue() != peer.sendQueue {
t.Fatal("GetSendQueue() did not forward")
}
if p.GetBufferedAmount() != 0 {
t.Fatal("GetBufferedAmount() != 0")
}
if err := p.AddVideoTrack(nil); err != nil {
t.Fatalf("AddVideoTrack(nil) error = %v", err)
}
if p.CanSend() {
t.Fatal("CanSend() = true without LiveKit room")
}
p.WatchConnection(context.Background())
if err := p.Send([]byte("x")); err != nil {
t.Fatalf("Send() error = %v", err)
}
if err := p.Close(); err != nil {
t.Fatalf("Close() error = %v", err)
}
if err := p.Send([]byte("x")); !errors.Is(err, ErrPeerClosed) {
t.Fatalf("Send() error = %v, want peer closed", err)
}
}