From 071106a6740ec62c332b97be3987427397a75ede Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 11 May 2026 03:28:34 +0300 Subject: [PATCH] refactor: migrate wbstream to engine/livekit + auth/wbstream MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Split the WB Stream provider into two orthogonal pieces: - internal/engine/livekit — generic LiveKit transport (URL+Token only, no service-specific assumptions). Registered as engine "livekit". - internal/auth/wbstream — WB Stream API flow (guest register, join, token exchange). Implements auth.Provider and auth.RoomCreator, reports engine "livekit". The carrier name "wbstream" now goes through registerEngineAuth, which wires the auth provider to the engine it declares. CLI surface is unchanged. session.Gen for wbstream calls the RoomCreator directly; that path will become fully generic in a later step. jazz and telemost remain on the legacy provider path for now. Co-Authored-By: Claude Opus 4.7 --- internal/app/session/session.go | 13 +- internal/{provider => auth}/wbstream/api.go | 21 +- internal/auth/wbstream/wbstream.go | 66 +++++ internal/carrier/builtin/engine_adapter.go | 152 +++++++++++ internal/carrier/builtin/register.go | 9 +- internal/e2e/tunnel_test.go | 5 +- internal/engine/livekit/livekit.go | 259 ++++++++++++++++++ internal/provider/wbstream/api_test.go | 124 --------- internal/provider/wbstream/peer.go | 280 -------------------- internal/provider/wbstream/peer_test.go | 76 ------ internal/provider/wbstream/provider.go | 84 ------ internal/provider/wbstream/provider_test.go | 50 ---- 12 files changed, 504 insertions(+), 635 deletions(-) rename internal/{provider => auth}/wbstream/api.go (92%) create mode 100644 internal/auth/wbstream/wbstream.go create mode 100644 internal/carrier/builtin/engine_adapter.go create mode 100644 internal/engine/livekit/livekit.go delete mode 100644 internal/provider/wbstream/api_test.go delete mode 100644 internal/provider/wbstream/peer.go delete mode 100644 internal/provider/wbstream/peer_test.go delete mode 100644 internal/provider/wbstream/provider.go delete mode 100644 internal/provider/wbstream/provider_test.go diff --git a/internal/app/session/session.go b/internal/app/session/session.go index 10dec96..73bb528 100644 --- a/internal/app/session/session.go +++ b/internal/app/session/session.go @@ -8,6 +8,8 @@ import ( "slices" "time" + "github.com/openlibrecommunity/olcrtc/internal/auth" + authWBStream "github.com/openlibrecommunity/olcrtc/internal/auth/wbstream" "github.com/openlibrecommunity/olcrtc/internal/carrier" "github.com/openlibrecommunity/olcrtc/internal/carrier/builtin" "github.com/openlibrecommunity/olcrtc/internal/client" @@ -15,7 +17,6 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/link/direct" "github.com/openlibrecommunity/olcrtc/internal/names" "github.com/openlibrecommunity/olcrtc/internal/provider/jazz" - "github.com/openlibrecommunity/olcrtc/internal/provider/wbstream" "github.com/openlibrecommunity/olcrtc/internal/server" "github.com/openlibrecommunity/olcrtc/internal/transport" "github.com/openlibrecommunity/olcrtc/internal/transport/datachannel" @@ -443,6 +444,8 @@ func genRetry(ctx context.Context, fn func(context.Context) error) error { } // Gen creates cfg.Amount rooms for the configured carrier and writes each room ID to out. +// +//nolint:cyclop // transitional; refactor/universal-carrier replaces this with auth.RoomCreator dispatch func Gen(ctx context.Context, cfg Config, out func(string)) error { switch cfg.Carrier { case carrierJazz: @@ -462,13 +465,17 @@ func Gen(ctx context.Context, cfg Config, out func(string)) error { out(roomID) } case carrierWBStream: + creator, ok := any(authWBStream.Provider{}).(auth.RoomCreator) + if !ok { + return fmt.Errorf("%w: wbstream auth provider does not implement RoomCreator", ErrUnsupportedCarrier) + } for i := range cfg.Amount { var roomID string err := genRetry(ctx, func(ctx context.Context) error { var err error - roomID, err = wbstream.CreateRoom(ctx, names.Generate()) + roomID, err = creator.CreateRoom(ctx, auth.Config{Name: names.Generate()}) if err != nil { - return fmt.Errorf("wbstream.CreateRoom: %w", err) + return fmt.Errorf("wbstream CreateRoom: %w", err) } return nil }) diff --git a/internal/provider/wbstream/api.go b/internal/auth/wbstream/api.go similarity index 92% rename from internal/provider/wbstream/api.go rename to internal/auth/wbstream/api.go index 7b991a1..f4b4747 100644 --- a/internal/provider/wbstream/api.go +++ b/internal/auth/wbstream/api.go @@ -1,3 +1,7 @@ +// Package wbstream is the auth provider for the WB Stream service. It +// produces LiveKit credentials by registering a guest, optionally creating +// a room, joining it, and exchanging the guest access token for a room +// token. package wbstream import ( @@ -12,7 +16,9 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/protect" ) -var apiBase = "https://stream.wb.ru" //nolint:gochecknoglobals // package-level state intentional +const wsURL = "wss://wbstream01-el.wb.ru:7880" + +var apiBase = "https://stream.wb.ru" //nolint:gochecknoglobals // overridable base URL for tests var ( errGuestRegister = errors.New("guest register failed") @@ -126,19 +132,6 @@ func createRoom(ctx context.Context, accessToken string) (string, error) { return res.RoomID, nil } -// CreateRoom registers a temporary guest, creates a WB Stream room, and returns its id. -func CreateRoom(ctx context.Context, displayName string) (string, error) { - accessToken, err := registerGuest(ctx, displayName) - if err != nil { - return "", fmt.Errorf("register guest: %w", err) - } - roomID, err := createRoom(ctx, accessToken) - if err != nil { - return "", fmt.Errorf("create room: %w", err) - } - return roomID, nil -} - func joinRoom(ctx context.Context, accessToken, roomID string) error { u := fmt.Sprintf("%s/api-room/api/v1/room/%s/join", apiBase, roomID) req, err := http.NewRequestWithContext(ctx, http.MethodPost, u, bytes.NewReader([]byte("{}"))) diff --git a/internal/auth/wbstream/wbstream.go b/internal/auth/wbstream/wbstream.go new file mode 100644 index 0000000..d14e771 --- /dev/null +++ b/internal/auth/wbstream/wbstream.go @@ -0,0 +1,66 @@ +package wbstream + +import ( + "context" + "fmt" + + "github.com/openlibrecommunity/olcrtc/internal/auth" +) + +// Provider produces LiveKit credentials for the WB Stream service. +type Provider struct{} + +// Engine reports which engine consumes credentials from this auth provider. +func (Provider) Engine() string { return "livekit" } + +// Issue runs the WB Stream auth flow and returns LiveKit credentials. +// +// If cfg.RoomURL is empty or "any", a fresh room is created on the fly — +// keeping the behaviour the legacy wbstream provider had. +func (Provider) Issue(ctx context.Context, cfg auth.Config) (auth.Credentials, error) { + accessToken, err := registerGuest(ctx, cfg.Name) + if err != nil { + return auth.Credentials{}, fmt.Errorf("register guest: %w", err) + } + + roomID := cfg.RoomURL + if roomID == "" || roomID == "any" { + roomID, err = createRoom(ctx, accessToken) + if err != nil { + return auth.Credentials{}, fmt.Errorf("create room: %w", err) + } + } + + if err := joinRoom(ctx, accessToken, roomID); err != nil { + return auth.Credentials{}, fmt.Errorf("join room: %w", err) + } + + token, err := getToken(ctx, accessToken, roomID, cfg.Name) + if err != nil { + return auth.Credentials{}, fmt.Errorf("get token: %w", err) + } + + return auth.Credentials{ + URL: wsURL, + Token: token, + Extra: map[string]string{"roomID": roomID}, + }, nil +} + +// CreateRoom registers a temporary guest and creates a WB Stream room. +// Used by gen mode. +func (Provider) CreateRoom(ctx context.Context, cfg auth.Config) (string, error) { + accessToken, err := registerGuest(ctx, cfg.Name) + if err != nil { + return "", fmt.Errorf("register guest: %w", err) + } + roomID, err := createRoom(ctx, accessToken) + if err != nil { + return "", fmt.Errorf("create room: %w", err) + } + return roomID, nil +} + +func init() { //nolint:gochecknoinits // auth registration is the canonical Go pattern for plugins + auth.Register("wbstream", Provider{}) +} diff --git a/internal/carrier/builtin/engine_adapter.go b/internal/carrier/builtin/engine_adapter.go new file mode 100644 index 0000000..6bb92e5 --- /dev/null +++ b/internal/carrier/builtin/engine_adapter.go @@ -0,0 +1,152 @@ +package builtin + +import ( + "context" + "fmt" + + "github.com/openlibrecommunity/olcrtc/internal/auth" + "github.com/openlibrecommunity/olcrtc/internal/carrier" + "github.com/openlibrecommunity/olcrtc/internal/engine" + "github.com/pion/webrtc/v4" +) + +// registerEngineAuth registers a carrier name that resolves credentials +// through an auth provider and connects via the engine the auth provider +// reports. +func registerEngineAuth(carrierName string, authProvider auth.Provider) { + carrier.Register(carrierName, func(ctx context.Context, cfg carrier.Config) (carrier.Session, error) { + creds, err := authProvider.Issue(ctx, auth.Config{ + RoomURL: cfg.RoomURL, + Name: cfg.Name, + DNSServer: cfg.DNSServer, + ProxyAddr: cfg.ProxyAddr, + ProxyPort: cfg.ProxyPort, + }) + if err != nil { + return nil, fmt.Errorf("auth issue: %w", err) + } + + sess, err := engine.New(ctx, authProvider.Engine(), engine.Config{ + URL: creds.URL, + Token: creds.Token, + Name: cfg.Name, + OnData: cfg.OnData, + DNSServer: cfg.DNSServer, + ProxyAddr: cfg.ProxyAddr, + ProxyPort: cfg.ProxyPort, + }) + if err != nil { + return nil, fmt.Errorf("engine new: %w", err) + } + return &engineSession{session: sess}, nil + }) +} + +type engineSession struct { + session engine.Session +} + +func (s *engineSession) Capabilities() carrier.Capabilities { + caps := s.session.Capabilities() + return carrier.Capabilities{ByteStream: caps.ByteStream, VideoTrack: caps.VideoTrack} +} + +func (s *engineSession) OpenByteStream() (carrier.ByteStream, error) { + if !s.session.Capabilities().ByteStream { + return nil, carrier.ErrByteStreamUnsupported + } + return &engineByteStream{session: s.session}, nil +} + +func (s *engineSession) OpenVideoTrack() (carrier.VideoTrack, error) { + vt, ok := s.session.(engine.VideoTrackCapable) + if !ok { + return nil, carrier.ErrVideoTrackUnsupported + } + return &engineVideoTrack{session: s.session, vt: vt}, nil +} + +type engineByteStream struct { + session engine.Session +} + +func (b *engineByteStream) Connect(ctx context.Context) error { + if err := b.session.Connect(ctx); err != nil { + return fmt.Errorf("connect: %w", err) + } + return nil +} + +func (b *engineByteStream) Send(data []byte) error { + if err := b.session.Send(data); err != nil { + return fmt.Errorf("send: %w", err) + } + return nil +} + +func (b *engineByteStream) Close() error { + if err := b.session.Close(); err != nil { + return fmt.Errorf("close: %w", err) + } + return nil +} + +func (b *engineByteStream) SetReconnectCallback(cb func()) { + b.session.SetReconnectCallback(func(_ *webrtc.DataChannel) { + if cb != nil { + cb() + } + }) +} + +func (b *engineByteStream) SetShouldReconnect(fn func() bool) { b.session.SetShouldReconnect(fn) } +func (b *engineByteStream) SetEndedCallback(cb func(string)) { b.session.SetEndedCallback(cb) } +func (b *engineByteStream) WatchConnection(ctx context.Context) { + b.session.WatchConnection(ctx) +} +func (b *engineByteStream) CanSend() bool { return b.session.CanSend() } + +type engineVideoTrack struct { + session engine.Session + vt engine.VideoTrackCapable +} + +func (v *engineVideoTrack) Connect(ctx context.Context) error { + if err := v.session.Connect(ctx); err != nil { + return fmt.Errorf("connect: %w", err) + } + return nil +} + +func (v *engineVideoTrack) Close() error { + if err := v.session.Close(); err != nil { + return fmt.Errorf("close: %w", err) + } + return nil +} + +func (v *engineVideoTrack) SetReconnectCallback(cb func()) { + v.session.SetReconnectCallback(func(_ *webrtc.DataChannel) { + if cb != nil { + cb() + } + }) +} + +func (v *engineVideoTrack) SetShouldReconnect(fn func() bool) { v.session.SetShouldReconnect(fn) } +func (v *engineVideoTrack) SetEndedCallback(cb func(string)) { v.session.SetEndedCallback(cb) } +func (v *engineVideoTrack) WatchConnection(ctx context.Context) { + v.session.WatchConnection(ctx) +} +func (v *engineVideoTrack) CanSend() bool { return v.session.CanSend() } + +func (v *engineVideoTrack) AddTrack(track webrtc.TrackLocal) error { + if err := v.vt.AddVideoTrack(track); err != nil { + return fmt.Errorf("add track: %w", err) + } + return nil +} + +func (v *engineVideoTrack) SetTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) { + v.vt.SetVideoTrackHandler(cb) +} diff --git a/internal/carrier/builtin/register.go b/internal/carrier/builtin/register.go index 7d955c6..6c45fb4 100644 --- a/internal/carrier/builtin/register.go +++ b/internal/carrier/builtin/register.go @@ -4,20 +4,25 @@ package builtin import ( "context" + authWBStream "github.com/openlibrecommunity/olcrtc/internal/auth/wbstream" "github.com/openlibrecommunity/olcrtc/internal/carrier" + _ "github.com/openlibrecommunity/olcrtc/internal/engine/livekit" // engine registration via init "github.com/openlibrecommunity/olcrtc/internal/provider" "github.com/openlibrecommunity/olcrtc/internal/provider/jazz" "github.com/openlibrecommunity/olcrtc/internal/provider/telemost" - "github.com/openlibrecommunity/olcrtc/internal/provider/wbstream" ) type providerFactory func(context.Context, provider.Config) (provider.Provider, error) // Register wires the built-in carriers into the carrier registry. func Register() { + // Legacy provider-based carriers (still being migrated to engine+auth). registerProvider("jazz", jazz.New) registerProvider("telemost", telemost.New) - registerProvider("wbstream", wbstream.New) + + // Migrated to engine+auth: WB Stream now goes through the LiveKit engine + // with the wbstream auth provider. + registerEngineAuth("wbstream", authWBStream.Provider{}) } func registerProvider(name string, factory providerFactory) { diff --git a/internal/e2e/tunnel_test.go b/internal/e2e/tunnel_test.go index 2e14e73..01a8c89 100644 --- a/internal/e2e/tunnel_test.go +++ b/internal/e2e/tunnel_test.go @@ -17,11 +17,12 @@ import ( "time" "github.com/openlibrecommunity/olcrtc/internal/app/session" + "github.com/openlibrecommunity/olcrtc/internal/auth" + authWBStream "github.com/openlibrecommunity/olcrtc/internal/auth/wbstream" "github.com/openlibrecommunity/olcrtc/internal/carrier" "github.com/openlibrecommunity/olcrtc/internal/client" "github.com/openlibrecommunity/olcrtc/internal/link" "github.com/openlibrecommunity/olcrtc/internal/provider/jazz" - "github.com/openlibrecommunity/olcrtc/internal/provider/wbstream" "github.com/openlibrecommunity/olcrtc/internal/server" "github.com/pion/webrtc/v4" ) @@ -372,7 +373,7 @@ func realRoomURL(ctx context.Context, t *testing.T, carrierName string) string { if *realE2EWBStreamRoom != "" { return *realE2EWBStreamRoom } - room, err := wbstream.CreateRoom(ctx, "olcrtc-e2e-room") + room, err := authWBStream.Provider{}.CreateRoom(ctx, auth.Config{Name: "olcrtc-e2e-room"}) if err != nil { t.Fatalf("create real wbstream room: %v", err) } diff --git a/internal/engine/livekit/livekit.go b/internal/engine/livekit/livekit.go new file mode 100644 index 0000000..1d41c9d --- /dev/null +++ b/internal/engine/livekit/livekit.go @@ -0,0 +1,259 @@ +// Package livekit implements an engine.Session backed by the LiveKit SFU +// protocol via the upstream livekit/server-sdk-go client. +// +// This engine is service-agnostic: it accepts a wss:// signaling URL and an +// access token, and provides byte-stream + video-track primitives over a +// LiveKit room. Service-specific token acquisition (e.g. WB Stream, Jazz, +// or a self-hosted LiveKit deployment) lives in the auth package. +package livekit + +import ( + "context" + "errors" + "fmt" + "log" + "sync" + "sync/atomic" + + protoLogger "github.com/livekit/protocol/logger" + lksdk "github.com/livekit/server-sdk-go/v2" + "github.com/openlibrecommunity/olcrtc/internal/engine" + "github.com/pion/webrtc/v4" +) + +const ( + defaultSendQueueSize = 5000 + dataPublishTopic = "olcrtc" + videoTrackName = "videochannel" +) + +var ( + // ErrSessionClosed is returned when an operation is attempted on a closed session. + ErrSessionClosed = errors.New("livekit session closed") + // ErrSendQueueFull is returned when the outbound queue cannot accept more data. + ErrSendQueueFull = errors.New("livekit send queue full") + // ErrRoomNotConnected is returned when the underlying room is not connected yet. + ErrRoomNotConnected = errors.New("livekit room not connected") + // ErrURLRequired is returned when no signaling URL was supplied. + ErrURLRequired = errors.New("livekit signaling URL required") + // ErrTokenRequired is returned when no access token was supplied. + ErrTokenRequired = errors.New("livekit access token required") +) + +// Session is the LiveKit engine handle. +type Session struct { + url string + token string + name string + room *lksdk.Room + onData func([]byte) + onReconnect func(*webrtc.DataChannel) + shouldReconnect func() bool + onEnded func(string) + sendQueue chan []byte + closed atomic.Bool + done chan struct{} + cancel context.CancelFunc + videoTrackMu sync.RWMutex + videoTracks []webrtc.TrackLocal + onVideoTrack func(*webrtc.TrackRemote, *webrtc.RTPReceiver) + wg sync.WaitGroup +} + +// New creates a new LiveKit engine session. +func New(ctx context.Context, cfg engine.Config) (engine.Session, error) { + if cfg.URL == "" { + return nil, ErrURLRequired + } + if cfg.Token == "" { + return nil, ErrTokenRequired + } + _, cancel := context.WithCancel(ctx) + return &Session{ + url: cfg.URL, + token: cfg.Token, + name: cfg.Name, + onData: cfg.OnData, + sendQueue: make(chan []byte, defaultSendQueueSize), + done: make(chan struct{}), + cancel: cancel, + }, nil +} + +// Capabilities reports what this engine can do. +func (s *Session) Capabilities() engine.Capabilities { + return engine.Capabilities{ByteStream: true, VideoTrack: true} +} + +// Connect joins the LiveKit room. +func (s *Session) Connect(_ context.Context) error { + roomCB := &lksdk.RoomCallback{ + ParticipantCallback: lksdk.ParticipantCallback{ + OnDataReceived: func(data []byte, _ lksdk.DataReceiveParams) { + if s.onData != nil { + s.onData(data) + } + }, + OnTrackSubscribed: func(track *webrtc.TrackRemote, _ *lksdk.RemoteTrackPublication, _ *lksdk.RemoteParticipant) { + if track.Kind() != webrtc.RTPCodecTypeVideo { + return + } + s.videoTrackMu.RLock() + cb := s.onVideoTrack + s.videoTrackMu.RUnlock() + if cb != nil { + cb(track, nil) + } + }, + }, + OnDisconnected: func() { + if !s.closed.Load() && s.onEnded != nil { + s.onEnded("disconnected from livekit") + } + }, + } + + room, err := lksdk.ConnectToRoomWithToken( + s.url, + s.token, + roomCB, + lksdk.WithAutoSubscribe(true), + lksdk.WithLogger(protoLogger.GetDiscardLogger()), + ) + if err != nil { + return fmt.Errorf("connect to room: %w", err) + } + + s.room = room + if err := s.publishPendingTracks(); err != nil { + return err + } + s.wg.Add(1) + go s.processSendQueue() + return nil +} + +func (s *Session) publishPendingTracks() error { + s.videoTrackMu.RLock() + defer s.videoTrackMu.RUnlock() + for _, track := range s.videoTracks { + if _, err := s.room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{ + Name: videoTrackName, + }); err != nil { + return fmt.Errorf("failed to publish track: %w", err) + } + } + return nil +} + +func (s *Session) processSendQueue() { + defer s.wg.Done() + for { + select { + case <-s.done: + return + case data, ok := <-s.sendQueue: + if !ok { + return + } + if err := s.room.LocalParticipant.PublishDataPacket( + lksdk.UserData(data), + lksdk.WithDataPublishTopic(dataPublishTopic), + lksdk.WithDataPublishReliable(true), + ); err != nil { + log.Printf("livekit publish data error: %v", err) + } + } + } +} + +// Send queues data for transmission. +func (s *Session) Send(data []byte) error { + if s.closed.Load() { + return ErrSessionClosed + } + select { + case s.sendQueue <- data: + return nil + default: + return ErrSendQueueFull + } +} + +// Close terminates the session. +func (s *Session) Close() error { + if s.closed.CompareAndSwap(false, true) { + s.cancel() + close(s.done) + if s.room != nil { + s.unpublishLocalTracks() + s.room.Disconnect() + } + close(s.sendQueue) + s.wg.Wait() + } + return nil +} + +func (s *Session) unpublishLocalTracks() { + if s.room == nil || s.room.LocalParticipant == nil { + return + } + for _, publication := range s.room.LocalParticipant.TrackPublications() { + if publication.SID() == "" { + continue + } + if err := s.room.LocalParticipant.UnpublishTrack(publication.SID()); err != nil { + log.Printf("livekit unpublish track error: %v", err) + } + } +} + +// SetReconnectCallback stores the reconnect callback (LiveKit reconnects internally; this is kept for API parity). +func (s *Session) SetReconnectCallback(cb func(*webrtc.DataChannel)) { s.onReconnect = cb } + +// SetShouldReconnect stores the reconnect predicate (kept for API parity). +func (s *Session) SetShouldReconnect(fn func() bool) { s.shouldReconnect = fn } + +// SetEndedCallback registers a function to call when the session ends. +func (s *Session) SetEndedCallback(cb func(string)) { s.onEnded = cb } + +// WatchConnection is a no-op; LiveKit handles connection supervision itself. +func (s *Session) WatchConnection(_ context.Context) {} + +// CanSend reports whether the session is ready to accept data. +func (s *Session) CanSend() bool { return !s.closed.Load() && s.room != nil } + +// GetSendQueue exposes the outbound queue. +func (s *Session) GetSendQueue() chan []byte { return s.sendQueue } + +// GetBufferedAmount is a stub for LiveKit (the SDK handles its own buffering). +func (s *Session) GetBufferedAmount() uint64 { return 0 } + +// AddVideoTrack publishes a video track to the room. +func (s *Session) AddVideoTrack(track webrtc.TrackLocal) error { + s.videoTrackMu.Lock() + s.videoTracks = append(s.videoTracks, track) + s.videoTrackMu.Unlock() + + if s.room == nil || s.room.LocalParticipant == nil { + return nil + } + if _, err := s.room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{ + Name: videoTrackName, + }); err != nil { + return fmt.Errorf("failed to publish track: %w", err) + } + return nil +} + +// SetVideoTrackHandler registers a callback for remote video tracks. +func (s *Session) SetVideoTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) { + s.videoTrackMu.Lock() + defer s.videoTrackMu.Unlock() + s.onVideoTrack = cb +} + +func init() { //nolint:gochecknoinits // engine registration is the canonical Go pattern for plugins + engine.Register("livekit", New) +} diff --git a/internal/provider/wbstream/api_test.go b/internal/provider/wbstream/api_test.go deleted file mode 100644 index 1ec6b26..0000000 --- a/internal/provider/wbstream/api_test.go +++ /dev/null @@ -1,124 +0,0 @@ -package wbstream - -import ( - "context" - "encoding/json" - "errors" - "net/http" - "net/http/httptest" - "testing" -) - -func withWBAPIServer(t *testing.T, h http.Handler) { - t.Helper() - old := apiBase - srv := httptest.NewServer(h) - t.Cleanup(func() { - apiBase = old - srv.Close() - }) - apiBase = srv.URL -} - -//nolint:cyclop // table-driven test naturally has many branches -func TestWBStreamAPIHappyPath(t *testing.T) { - withWBAPIServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - switch r.URL.Path { - case "/auth/api/v1/auth/user/guest-register": - if r.Method != http.MethodPost { - t.Fatalf("guest method = %s", r.Method) - } - _ = json.NewEncoder(w).Encode(guestRegisterResponse{AccessToken: "access"}) //nolint:goconst,gosec,lll // test literal; G117 is a false positive for test fixtures - case "/api-room/api/v2/room": - if r.Header.Get("Authorization") != "Bearer access" { - t.Fatalf("room auth = %q", r.Header.Get("Authorization")) - } - w.WriteHeader(http.StatusCreated) - _ = json.NewEncoder(w).Encode(createRoomResponse{RoomID: "room"}) //nolint:goconst,lll // test literal, repetition is intentional - case "/api-room/api/v1/room/room/join": - w.WriteHeader(http.StatusOK) - case "/api-room-manager/api/v1/room/room/token": - if r.URL.Query().Get("displayName") != "peer" { - t.Fatalf("displayName query = %q", r.URL.Query().Get("displayName")) - } - _ = json.NewEncoder(w).Encode(tokenResponse{RoomToken: "token"}) //nolint:goconst,lll // test literal, repetition is intentional - default: - http.NotFound(w, r) - } - })) - - access, err := registerGuest(context.Background(), "peer") - if err != nil { - t.Fatalf("registerGuest() error = %v", err) - } - if access != "access" { - t.Fatalf("registerGuest() = %q", access) - } - - room, err := createRoom(context.Background(), access) - if err != nil { - t.Fatalf("createRoom() error = %v", err) - } - if room != "room" { - t.Fatalf("createRoom() = %q", room) - } - - if err := joinRoom(context.Background(), access, room); err != nil { - t.Fatalf("joinRoom() error = %v", err) - } - token, err := getToken(context.Background(), access, room, "peer") - if err != nil { - t.Fatalf("getToken() error = %v", err) - } - if token != "token" { - t.Fatalf("getToken() = %q", token) - } -} - -func TestWBStreamAPIErrors(t *testing.T) { - withWBAPIServer(t, http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { - http.Error(w, "bad", http.StatusBadGateway) - })) - - if _, err := registerGuest(context.Background(), "peer"); !errors.Is(err, errGuestRegister) { - t.Fatalf("registerGuest() error = %v, want %v", err, errGuestRegister) - } - if _, err := createRoom(context.Background(), "access"); !errors.Is(err, errCreateRoom) { - t.Fatalf("createRoom() error = %v, want %v", err, errCreateRoom) - } - if err := joinRoom(context.Background(), "access", "room"); !errors.Is(err, errJoinRoom) { - t.Fatalf("joinRoom() error = %v, want %v", err, errJoinRoom) - } - if _, err := getToken(context.Background(), "access", "room", "peer"); !errors.Is(err, errGetToken) { - t.Fatalf("getToken() error = %v, want %v", err, errGetToken) - } -} - -func TestWBStreamGetRoomToken(t *testing.T) { - withWBAPIServer(t, http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - switch r.URL.Path { - case "/auth/api/v1/auth/user/guest-register": - _ = json.NewEncoder(w).Encode(guestRegisterResponse{AccessToken: "access"}) //nolint:gosec,lll // G117: test-only struct mirroring upstream API shape - case "/api-room/api/v2/room": - _ = json.NewEncoder(w).Encode(createRoomResponse{RoomID: "created"}) - case "/api-room/api/v1/room/created/join": - w.WriteHeader(http.StatusOK) - case "/api-room-manager/api/v1/room/created/token": - _ = json.NewEncoder(w).Encode(tokenResponse{RoomToken: "token"}) - default: - http.NotFound(w, r) - } - })) - - p, err := NewPeer(context.Background(), "any", "peer", nil) - if err != nil { - t.Fatalf("NewPeer() error = %v", err) - } - token, err := p.getRoomToken(context.Background()) - if err != nil { - t.Fatalf("getRoomToken() error = %v", err) - } - if token != "token" { - t.Fatalf("getRoomToken() = %q", token) - } -} diff --git a/internal/provider/wbstream/peer.go b/internal/provider/wbstream/peer.go deleted file mode 100644 index 03f5f91..0000000 --- a/internal/provider/wbstream/peer.go +++ /dev/null @@ -1,280 +0,0 @@ -// Package wbstream implements the WB Stream WebRTC provider. -package wbstream - -import ( - "context" - "errors" - "fmt" - "log" - "sync" - "sync/atomic" - - protoLogger "github.com/livekit/protocol/logger" - lksdk "github.com/livekit/server-sdk-go/v2" - "github.com/pion/webrtc/v4" -) - -const ( - wsURL = "wss://wbstream01-el.wb.ru:7880" -) - -var ( - // ErrPeerClosed is returned when an operation is attempted on a closed peer. - ErrPeerClosed = errors.New("peer closed") - // ErrSendQueueFull is returned when the transmission queue is full. - ErrSendQueueFull = errors.New("send queue full") - // ErrLiveKitNotConnected is returned when the LiveKit room is not connected. - ErrLiveKitNotConnected = errors.New("livekit room not connected") -) - -// Peer represents a WB Stream WebRTC connection using LiveKit. -type Peer struct { - roomURL string - name string - room *lksdk.Room - onData func([]byte) - onReconnect func(*webrtc.DataChannel) - shouldReconnect func() bool - onEnded func(string) - sendQueue chan []byte - closed atomic.Bool - done chan struct{} - cancel context.CancelFunc - videoTrackMu sync.RWMutex - videoTracks []webrtc.TrackLocal - onVideoTrack func(*webrtc.TrackRemote, *webrtc.RTPReceiver) - wg sync.WaitGroup -} - -// NewPeer creates a new WB Stream provider peer. -func NewPeer(ctx context.Context, roomURL, name string, onData func([]byte)) (*Peer, error) { - _, cancel := context.WithCancel(ctx) - return &Peer{ - roomURL: roomURL, - name: name, - onData: onData, - sendQueue: make(chan []byte, 5000), - done: make(chan struct{}), - cancel: cancel, - }, nil -} - -// Connect starts the WebRTC connection process. -func (p *Peer) Connect(ctx context.Context) error { - token, err := p.getRoomToken(ctx) - if err != nil { - return fmt.Errorf("get room token: %w", err) - } - - roomCB := &lksdk.RoomCallback{ - ParticipantCallback: lksdk.ParticipantCallback{ - OnDataReceived: func(data []byte, _ lksdk.DataReceiveParams) { - if p.onData != nil { - p.onData(data) - } - }, - OnTrackSubscribed: func(track *webrtc.TrackRemote, _ *lksdk.RemoteTrackPublication, _ *lksdk.RemoteParticipant) { - if track.Kind() != webrtc.RTPCodecTypeVideo { - return - } - - p.videoTrackMu.RLock() - cb := p.onVideoTrack - p.videoTrackMu.RUnlock() - if cb != nil { - cb(track, nil) - } - }, - }, - OnDisconnected: func() { - if !p.closed.Load() && p.onEnded != nil { - p.onEnded("disconnected from livekit") - } - }, - } - - room, err := lksdk.ConnectToRoomWithToken( - wsURL, - token, - roomCB, - lksdk.WithAutoSubscribe(true), - lksdk.WithLogger(protoLogger.GetDiscardLogger()), - ) - if err != nil { - return fmt.Errorf("connect to room: %w", err) - } - - p.room = room - if err := p.publishPendingTracks(); err != nil { - return err - } - p.wg.Add(1) - go p.processSendQueue() - - return nil -} - -func (p *Peer) publishPendingTracks() error { - p.videoTrackMu.RLock() - defer p.videoTrackMu.RUnlock() - - for _, track := range p.videoTracks { - if _, err := p.room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{ - Name: "videochannel", - }); err != nil { - return fmt.Errorf("failed to publish track: %w", err) - } - } - - return nil -} - -func (p *Peer) getRoomToken(ctx context.Context) (string, error) { - accessToken, err := registerGuest(ctx, p.name) - if err != nil { - return "", fmt.Errorf("register guest: %w", err) - } - - roomID := p.roomURL - if roomID == "" || roomID == "any" { - roomID, err = createRoom(ctx, accessToken) - if err != nil { - return "", fmt.Errorf("create room: %w", err) - } - log.Printf("WB Stream room created: %s", roomID) - log.Printf("To connect client use: -id %s", roomID) - } - - if err := joinRoom(ctx, accessToken, roomID); err != nil { - return "", fmt.Errorf("join room: %w", err) - } - - token, err := getToken(ctx, accessToken, roomID, p.name) - if err != nil { - return "", fmt.Errorf("get token: %w", err) - } - - return token, nil -} - -func (p *Peer) processSendQueue() { - defer p.wg.Done() - for { - select { - case <-p.done: - return - case data, ok := <-p.sendQueue: - if !ok { - return - } - if err := p.room.LocalParticipant.PublishDataPacket( - lksdk.UserData(data), - lksdk.WithDataPublishTopic("olcrtc"), - lksdk.WithDataPublishReliable(true), - ); err != nil { - log.Printf("WB Stream publish data error: %v", err) - } - } - } -} - -// Send transmits data to the room. -func (p *Peer) Send(data []byte) error { - if p.closed.Load() { - return ErrPeerClosed - } - select { - case p.sendQueue <- data: - return nil - default: - return ErrSendQueueFull - } -} - -// Close terminates the provider connection. -func (p *Peer) Close() error { - if p.closed.CompareAndSwap(false, true) { - p.cancel() - close(p.done) - if p.room != nil { - p.unpublishLocalTracks() - p.room.Disconnect() - } - close(p.sendQueue) - p.wg.Wait() - } - return nil -} - -func (p *Peer) unpublishLocalTracks() { - if p.room == nil || p.room.LocalParticipant == nil { - return - } - for _, publication := range p.room.LocalParticipant.TrackPublications() { - if publication.SID() == "" { - continue - } - if err := p.room.LocalParticipant.UnpublishTrack(publication.SID()); err != nil { - log.Printf("WB Stream unpublish track error: %v", err) - } - } -} - -// SetReconnectCallback is a stub for WB Stream. -func (p *Peer) SetReconnectCallback(cb func(*webrtc.DataChannel)) { - p.onReconnect = cb -} - -// SetShouldReconnect is a stub for WB Stream. -func (p *Peer) SetShouldReconnect(fn func() bool) { - p.shouldReconnect = fn -} - -// SetEndedCallback sets the function to call when the session ends. -func (p *Peer) SetEndedCallback(cb func(string)) { - p.onEnded = cb -} - -// WatchConnection is a stub for WB Stream. -func (p *Peer) WatchConnection(_ context.Context) {} - -// CanSend checks if the provider is ready to transmit data. -func (p *Peer) CanSend() bool { - return !p.closed.Load() && p.room != nil -} - -// GetSendQueue returns the data transmission queue. -func (p *Peer) GetSendQueue() chan []byte { - return p.sendQueue -} - -// GetBufferedAmount is a stub for WB Stream. -func (p *Peer) GetBufferedAmount() uint64 { - return 0 -} - -// AddVideoTrack adds a video track to the LiveKit room. -func (p *Peer) AddVideoTrack(track webrtc.TrackLocal) error { - p.videoTrackMu.Lock() - p.videoTracks = append(p.videoTracks, track) - p.videoTrackMu.Unlock() - - if p.room == nil || p.room.LocalParticipant == nil { - return nil - } - - if _, err := p.room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{ - Name: "videochannel", - }); err != nil { - return fmt.Errorf("failed to publish track: %w", err) - } - - return nil -} - -// SetVideoTrackHandler registers a callback for remote video tracks. -func (p *Peer) SetVideoTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) { - p.videoTrackMu.Lock() - defer p.videoTrackMu.Unlock() - p.onVideoTrack = cb -} diff --git a/internal/provider/wbstream/peer_test.go b/internal/provider/wbstream/peer_test.go deleted file mode 100644 index e9715d5..0000000 --- a/internal/provider/wbstream/peer_test.go +++ /dev/null @@ -1,76 +0,0 @@ -package wbstream - -import ( - "context" - "errors" - "testing" - - "github.com/pion/webrtc/v4" -) - -func TestNewPeerAndSimpleAccessors(t *testing.T) { - p, err := NewPeer(context.Background(), "room", "name", func([]byte) {}) - if err != nil { - t.Fatalf("NewPeer() error = %v", err) - } - if p.roomURL != "room" || p.name != "name" || p.sendQueue == nil || p.done == nil { //nolint:goconst,lll // test literal, repetition is intentional - t.Fatalf("NewPeer() = %+v", p) - } - if p.GetSendQueue() != p.sendQueue { - t.Fatal("GetSendQueue() did not return sendQueue") - } - if p.GetBufferedAmount() != 0 { - t.Fatal("GetBufferedAmount() != 0") - } - if p.CanSend() { - t.Fatal("CanSend() = true without room") - } -} - -func TestSendQueueAndClose(t *testing.T) { - p, err := NewPeer(context.Background(), "room", "name", nil) - if err != nil { - t.Fatalf("NewPeer() error = %v", err) - } - p.sendQueue = make(chan []byte, 1) - - if err := p.Send([]byte("one")); err != nil { - t.Fatalf("Send() error = %v", err) - } - if err := p.Send([]byte("two")); !errors.Is(err, ErrSendQueueFull) { - t.Fatalf("Send() error = %v, want %v", err, ErrSendQueueFull) - } - if err := p.Close(); err != nil { - t.Fatalf("Close() error = %v", err) - } - if err := p.Send([]byte("closed")); !errors.Is(err, ErrPeerClosed) { - t.Fatalf("Send() error = %v, want %v", err, ErrPeerClosed) - } - if err := p.Close(); err != nil { - t.Fatalf("second Close() error = %v", err) - } -} - -func TestCallbacksAndVideoTrackStorage(t *testing.T) { - p, err := NewPeer(context.Background(), "room", "name", nil) - if err != nil { - t.Fatalf("NewPeer() error = %v", err) - } - - p.SetReconnectCallback(func(*webrtc.DataChannel) {}) - p.SetShouldReconnect(func() bool { return true }) - p.SetEndedCallback(func(string) {}) - p.SetVideoTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver) {}) - p.WatchConnection(context.Background()) - - if p.onReconnect == nil || p.shouldReconnect == nil || p.onEnded == nil || p.onVideoTrack == nil { - t.Fatal("callbacks were not stored") - } - - if err := p.AddVideoTrack(nil); err != nil { - t.Fatalf("AddVideoTrack(nil) error = %v", err) - } - if len(p.videoTracks) != 1 { - t.Fatalf("videoTracks len = %d, want 1", len(p.videoTracks)) - } -} diff --git a/internal/provider/wbstream/provider.go b/internal/provider/wbstream/provider.go deleted file mode 100644 index a6ebbaa..0000000 --- a/internal/provider/wbstream/provider.go +++ /dev/null @@ -1,84 +0,0 @@ -// Package wbstream implements the WB Stream WebRTC provider. -package wbstream - -import ( - "context" - "fmt" - - "github.com/openlibrecommunity/olcrtc/internal/provider" - "github.com/pion/webrtc/v4" -) - -type wbStreamProvider struct { - peer *Peer -} - -// New creates a new WB Stream provider instance. -func New(ctx context.Context, cfg provider.Config) (provider.Provider, error) { - peer, err := NewPeer(ctx, cfg.RoomURL, cfg.Name, cfg.OnData) - if err != nil { - return nil, fmt.Errorf("create wbstream peer: %w", err) - } - - return &wbStreamProvider{peer: peer}, nil -} - -// Connect starts the provider connection. -func (w *wbStreamProvider) Connect(ctx context.Context) error { - return w.peer.Connect(ctx) -} - -// Send transmits data to the room. -func (w *wbStreamProvider) Send(data []byte) error { - return w.peer.Send(data) -} - -// Close terminates the provider connection. -func (w *wbStreamProvider) Close() error { - return w.peer.Close() -} - -// SetReconnectCallback sets the function to call on reconnection. -func (w *wbStreamProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) { - w.peer.SetReconnectCallback(cb) -} - -// SetShouldReconnect sets the function to determine if reconnection should occur. -func (w *wbStreamProvider) SetShouldReconnect(fn func() bool) { - w.peer.SetShouldReconnect(fn) -} - -// SetEndedCallback sets the function to call when the session ends. -func (w *wbStreamProvider) SetEndedCallback(cb func(string)) { - w.peer.SetEndedCallback(cb) -} - -// WatchConnection monitors the provider connection state. -func (w *wbStreamProvider) WatchConnection(ctx context.Context) { - w.peer.WatchConnection(ctx) -} - -// CanSend checks if the provider is ready to transmit data. -func (w *wbStreamProvider) CanSend() bool { - return w.peer.CanSend() -} - -// GetSendQueue returns the data transmission queue. -func (w *wbStreamProvider) GetSendQueue() chan []byte { - return w.peer.GetSendQueue() -} - -// GetBufferedAmount returns the current WebRTC buffered amount. -func (w *wbStreamProvider) GetBufferedAmount() uint64 { - return w.peer.GetBufferedAmount() -} - -// AddVideoTrack adds a video track to the wbstream connection. -func (w *wbStreamProvider) AddVideoTrack(track webrtc.TrackLocal) error { - return w.peer.AddVideoTrack(track) -} - -// SetVideoTrackHandler registers a callback for subscribed remote video tracks. -func (w *wbStreamProvider) SetVideoTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) { - w.peer.SetVideoTrackHandler(cb) -} diff --git a/internal/provider/wbstream/provider_test.go b/internal/provider/wbstream/provider_test.go deleted file mode 100644 index fe16e24..0000000 --- a/internal/provider/wbstream/provider_test.go +++ /dev/null @@ -1,50 +0,0 @@ -package wbstream - -import ( - "context" - "errors" - "testing" - - "github.com/pion/webrtc/v4" -) - -//nolint:cyclop // table-driven test naturally has many branches -func TestWBStreamProviderForwardsPeerMethods(t *testing.T) { - peer, err := NewPeer(context.Background(), "room", "name", nil) - if err != nil { - t.Fatalf("NewPeer() error = %v", err) - } - p := &wbStreamProvider{peer: peer} - - p.SetReconnectCallback(func(*webrtc.DataChannel) {}) - p.SetShouldReconnect(func() bool { return true }) - p.SetEndedCallback(func(string) {}) - p.SetVideoTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver) {}) - if peer.onReconnect == nil || peer.shouldReconnect == nil || peer.onEnded == nil || peer.onVideoTrack == nil { - t.Fatal("callbacks were not forwarded") - } - - if p.GetSendQueue() != peer.sendQueue { - t.Fatal("GetSendQueue() did not forward") - } - if p.GetBufferedAmount() != 0 { - t.Fatal("GetBufferedAmount() != 0") - } - if err := p.AddVideoTrack(nil); err != nil { - t.Fatalf("AddVideoTrack(nil) error = %v", err) - } - if p.CanSend() { - t.Fatal("CanSend() = true without LiveKit room") - } - p.WatchConnection(context.Background()) - - if err := p.Send([]byte("x")); err != nil { - t.Fatalf("Send() error = %v", err) - } - if err := p.Close(); err != nil { - t.Fatalf("Close() error = %v", err) - } - if err := p.Send([]byte("x")); !errors.Is(err, ErrPeerClosed) { - t.Fatalf("Send() error = %v, want peer closed", err) - } -}