From 9c992d6fe4c4e449934386e093d5a7f37f27077b Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Thu, 7 May 2026 14:28:32 +0300 Subject: [PATCH] refactor: remove legacy code --- Dockerfile | 2 +- cmd/olcrtc/main.go | 13 +- cmd/olcrtc/main_test.go | 33 +-- docker-compose.server.yml | 2 +- docs/uri.md | 2 +- internal/app/session/session.go | 2 +- internal/carrier/builtin/provider_adapter.go | 121 +++++++++++ .../carrier/builtin/provider_adapter_test.go | 198 +++++++++++++++++ internal/carrier/builtin/register.go | 30 ++- internal/carrier/bytestream.go | 109 ---------- internal/carrier/carrier.go | 20 -- internal/carrier/carrier_test.go | 205 +----------------- internal/provider/provider.go | 33 --- internal/provider/provider_test.go | 75 ------- internal/transport/datachannel/transport.go | 6 +- .../transport/datachannel/transport_test.go | 2 +- internal/transport/seichannel/transport.go | 4 +- .../seichannel/transport_unit_test.go | 2 +- internal/transport/videochannel/transport.go | 4 +- .../videochannel/transport_unit_test.go | 2 +- internal/transport/vp8channel/transport.go | 4 +- .../vp8channel/transport_unit_test.go | 2 +- mobile/mobile.go | 2 +- script/docker/olcrtc-entrypoint.sh | 8 +- 24 files changed, 380 insertions(+), 501 deletions(-) create mode 100644 internal/carrier/builtin/provider_adapter.go create mode 100644 internal/carrier/builtin/provider_adapter_test.go delete mode 100644 internal/provider/provider_test.go diff --git a/Dockerfile b/Dockerfile index 6321ee1..532700b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -42,7 +42,7 @@ USER olcrtc:olcrtc WORKDIR /var/lib/olcrtc ENV OLCRTC_MODE=srv \ - OLCRTC_PROVIDER= \ + OLCRTC_CARRIER= \ OLCRTC_DATA_DIR=/usr/share/olcrtc \ OLCRTC_DNS=1.1.1.1:53 \ OLCRTC_KEY_FILE=/var/lib/olcrtc/key.hex diff --git a/cmd/olcrtc/main.go b/cmd/olcrtc/main.go index 1bff663..81d156c 100644 --- a/cmd/olcrtc/main.go +++ b/cmd/olcrtc/main.go @@ -33,7 +33,6 @@ type config struct { carrier string roomID string clientID string - provider string socksPort int socksHost string keyHex string @@ -140,7 +139,6 @@ func parseFlagsFrom(args []string, errorHandling flag.ErrorHandling) (config, er fs.StringVar(&cfg.carrier, "carrier", "", "Carrier: telemost, jazz, wbstream") fs.StringVar(&cfg.roomID, "id", "", "Room ID") fs.StringVar(&cfg.clientID, "client-id", "", "Client ID: binds one srv to one cnc (required)") - fs.StringVar(&cfg.provider, "provider", "", "Deprecated alias for -carrier") fs.IntVar(&cfg.socksPort, "socks-port", 0, "SOCKS5 port (client only)") fs.StringVar(&cfg.socksHost, "socks-host", "", "SOCKS5 listen host (client only)") fs.StringVar(&cfg.keyHex, "key", "", "Shared encryption key (hex)") @@ -210,7 +208,7 @@ func toSessionConfig(cfg config) session.Config { Mode: cfg.mode, Link: cfg.link, Transport: cfg.transport, - Carrier: firstNonEmpty(cfg.carrier, cfg.provider), + Carrier: cfg.carrier, RoomID: cfg.roomID, ClientID: cfg.clientID, KeyHex: cfg.keyHex, @@ -238,15 +236,6 @@ func toSessionConfig(cfg config) session.Config { } } -func firstNonEmpty(values ...string) string { - for _, value := range values { - if value != "" { - return value - } - } - return "" -} - func waitForShutdown(errCh <-chan error) error { done := make(chan error, 1) go func() { diff --git a/cmd/olcrtc/main_test.go b/cmd/olcrtc/main_test.go index 9ae01d2..bf393ff 100644 --- a/cmd/olcrtc/main_test.go +++ b/cmd/olcrtc/main_test.go @@ -12,12 +12,12 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/logger" ) -func TestToSessionConfigAndFirstNonEmpty(t *testing.T) { +func TestToSessionConfig(t *testing.T) { cfg := config{ mode: "cnc", link: "direct", transport: "vp8channel", - provider: "jazz", + carrier: "jazz", roomID: "room", clientID: "client", keyHex: "key", @@ -52,18 +52,6 @@ func TestToSessionConfigAndFirstNonEmpty(t *testing.T) { t.Fatalf("toSessionConfig() = %+v", got) } - cfg.carrier = "telemost" - got = toSessionConfig(cfg) - if got.Carrier != "telemost" { - t.Fatalf("carrier precedence = %q, want telemost", got.Carrier) - } - - if got := firstNonEmpty("", "", "x", "y"); got != "x" { - t.Fatalf("firstNonEmpty() = %q, want x", got) - } - if got := firstNonEmpty("", ""); got != "" { - t.Fatalf("firstNonEmpty(empty) = %q, want empty", got) - } } func TestParseFlagsFrom(t *testing.T) { @@ -249,20 +237,3 @@ func TestWaitForShutdown(t *testing.T) { t.Fatalf("waitForShutdown(error) = %v, want %v", err, want) } } - -func TestValidateConfigAliasStillValidates(t *testing.T) { - session.RegisterDefaults() - cfg := config{ - mode: "srv", - link: "direct", - transport: "datachannel", - provider: "jazz", - clientID: "client", - keyHex: "key", - dnsServer: "1.1.1.1:53", - videoCodec: "qrcode", - } - if err := session.Validate(toSessionConfig(cfg)); err != nil { - t.Fatalf("Validate(toSessionConfig(alias)) error = %v", err) - } -} diff --git a/docker-compose.server.yml b/docker-compose.server.yml index a677da1..5737d07 100644 --- a/docker-compose.server.yml +++ b/docker-compose.server.yml @@ -6,7 +6,7 @@ services: container_name: olcrtc-server restart: unless-stopped environment: - OLCRTC_PROVIDER: "${OLCRTC_PROVIDER:?set OLCRTC_PROVIDER (telemost, jazz, wbstream)}" + OLCRTC_CARRIER: "${OLCRTC_CARRIER:?set OLCRTC_CARRIER (telemost, jazz, wbstream)}" OLCRTC_ROOM_ID: "${OLCRTC_ROOM_ID:?set OLCRTC_ROOM_ID}" OLCRTC_KEY: "${OLCRTC_KEY:-}" OLCRTC_DNS: "${OLCRTC_DNS:-1.1.1.1:53}" diff --git a/docs/uri.md b/docs/uri.md index 28b344f..f834e96 100644 --- a/docs/uri.md +++ b/docs/uri.md @@ -30,7 +30,7 @@ olcrtc://?@#%$ | Поле | Значение | |------|----------| -| `` | Имя carrier/provider, например `telemost`, `jazz`, `wbstream` | +| `` | Имя carrier, например `telemost`, `jazz`, `wbstream` | | `` | Имя транспорта, например `datachannel`, `vp8channel`, `seichannel`, `videochannel` | | `` | Идентификатор комнаты или carrier-specific room URL/ID | | `` | Ключ шифрования в hex, обычно 64 символа (`32` байта) | diff --git a/internal/app/session/session.go b/internal/app/session/session.go index 4b660d8..befbf35 100644 --- a/internal/app/session/session.go +++ b/internal/app/session/session.go @@ -121,7 +121,7 @@ type Config struct { SEIAckTimeoutMS int } -// RegisterDefaults registers built-in providers and transports. +// RegisterDefaults registers built-in carriers and transports. func RegisterDefaults() { builtin.Register() link.Register("direct", direct.New) diff --git a/internal/carrier/builtin/provider_adapter.go b/internal/carrier/builtin/provider_adapter.go new file mode 100644 index 0000000..ced340e --- /dev/null +++ b/internal/carrier/builtin/provider_adapter.go @@ -0,0 +1,121 @@ +package builtin + +import ( + "context" + "fmt" + + "github.com/openlibrecommunity/olcrtc/internal/carrier" + "github.com/openlibrecommunity/olcrtc/internal/provider" + "github.com/pion/webrtc/v4" +) + +type providerSession struct { + provider provider.Provider +} + +func (s *providerSession) Capabilities() carrier.Capabilities { + caps := carrier.Capabilities{ByteStream: true} + _, caps.VideoTrack = s.provider.(videoTrackProvider) + return caps +} + +func (s *providerSession) OpenByteStream() (carrier.ByteStream, error) { + return &providerByteStream{provider: s.provider}, nil +} + +func (s *providerSession) OpenVideoTrack() (carrier.VideoTrack, error) { + vtp, ok := s.provider.(videoTrackProvider) + if !ok { + return nil, carrier.ErrVideoTrackUnsupported + } + return &providerVideoTrack{provider: vtp}, nil +} + +type videoTrackProvider interface { + provider.Provider + provider.VideoTrackCapable +} + +type providerByteStream struct { + provider provider.Provider +} + +func (p *providerByteStream) Connect(ctx context.Context) error { + if err := p.provider.Connect(ctx); err != nil { + return fmt.Errorf("connect: %w", err) + } + return nil +} + +func (p *providerByteStream) Send(data []byte) error { + if err := p.provider.Send(data); err != nil { + return fmt.Errorf("send: %w", err) + } + return nil +} + +func (p *providerByteStream) Close() error { + if err := p.provider.Close(); err != nil { + return fmt.Errorf("close: %w", err) + } + return nil +} + +func (p *providerByteStream) SetReconnectCallback(cb func()) { + p.provider.SetReconnectCallback(func(_ *webrtc.DataChannel) { + if cb != nil { + cb() + } + }) +} + +func (p *providerByteStream) SetShouldReconnect(fn func() bool) { p.provider.SetShouldReconnect(fn) } +func (p *providerByteStream) SetEndedCallback(cb func(string)) { p.provider.SetEndedCallback(cb) } +func (p *providerByteStream) WatchConnection(ctx context.Context) { + p.provider.WatchConnection(ctx) +} +func (p *providerByteStream) CanSend() bool { return p.provider.CanSend() } + +type providerVideoTrack struct { + provider videoTrackProvider +} + +func (v *providerVideoTrack) Connect(ctx context.Context) error { + if err := v.provider.Connect(ctx); err != nil { + return fmt.Errorf("connect: %w", err) + } + return nil +} + +func (v *providerVideoTrack) Close() error { + if err := v.provider.Close(); err != nil { + return fmt.Errorf("close: %w", err) + } + return nil +} + +func (v *providerVideoTrack) SetReconnectCallback(cb func()) { + v.provider.SetReconnectCallback(func(_ *webrtc.DataChannel) { + if cb != nil { + cb() + } + }) +} + +func (v *providerVideoTrack) SetShouldReconnect(fn func() bool) { v.provider.SetShouldReconnect(fn) } +func (v *providerVideoTrack) SetEndedCallback(cb func(string)) { v.provider.SetEndedCallback(cb) } +func (v *providerVideoTrack) WatchConnection(ctx context.Context) { + v.provider.WatchConnection(ctx) +} +func (v *providerVideoTrack) CanSend() bool { return v.provider.CanSend() } + +func (v *providerVideoTrack) AddTrack(track webrtc.TrackLocal) error { + if err := v.provider.AddVideoTrack(track); err != nil { + return fmt.Errorf("add track: %w", err) + } + return nil +} + +func (v *providerVideoTrack) SetTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) { + v.provider.SetVideoTrackHandler(cb) +} diff --git a/internal/carrier/builtin/provider_adapter_test.go b/internal/carrier/builtin/provider_adapter_test.go new file mode 100644 index 0000000..908b27f --- /dev/null +++ b/internal/carrier/builtin/provider_adapter_test.go @@ -0,0 +1,198 @@ +package builtin + +import ( + "context" + "errors" + "testing" + + "github.com/openlibrecommunity/olcrtc/internal/carrier" + "github.com/pion/webrtc/v4" +) + +type stubProvider struct { + connectErr error + sendErr error + closeErr error + canSend bool + reconnectCallback func(*webrtc.DataChannel) + shouldReconnect func() bool + endedCallback func(string) + watchCalled bool + addTrackErr error + trackHandlerCalled bool +} + +func (s *stubProvider) Connect(context.Context) error { return s.connectErr } +func (s *stubProvider) Send([]byte) error { return s.sendErr } +func (s *stubProvider) Close() error { return s.closeErr } +func (s *stubProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) { s.reconnectCallback = cb } +func (s *stubProvider) SetShouldReconnect(fn func() bool) { s.shouldReconnect = fn } +func (s *stubProvider) SetEndedCallback(cb func(string)) { s.endedCallback = cb } +func (s *stubProvider) WatchConnection(context.Context) { s.watchCalled = true } +func (s *stubProvider) CanSend() bool { return s.canSend } +func (s *stubProvider) GetSendQueue() chan []byte { return nil } +func (s *stubProvider) GetBufferedAmount() uint64 { return 0 } +func (s *stubProvider) AddVideoTrack(webrtc.TrackLocal) error { return s.addTrackErr } +func (s *stubProvider) SetVideoTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) { + s.trackHandlerCalled = true +} + +type plainProvider struct { + connectErr error + sendErr error + closeErr error + canSend bool + reconnectCallback func(*webrtc.DataChannel) + shouldReconnect func() bool + endedCallback func(string) + watchCalled bool +} + +func (p *plainProvider) Connect(context.Context) error { return p.connectErr } +func (p *plainProvider) Send([]byte) error { return p.sendErr } +func (p *plainProvider) Close() error { return p.closeErr } +func (p *plainProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) { p.reconnectCallback = cb } +func (p *plainProvider) SetShouldReconnect(fn func() bool) { p.shouldReconnect = fn } +func (p *plainProvider) SetEndedCallback(cb func(string)) { p.endedCallback = cb } +func (p *plainProvider) WatchConnection(context.Context) { p.watchCalled = true } +func (p *plainProvider) CanSend() bool { return p.canSend } +func (p *plainProvider) GetSendQueue() chan []byte { return nil } +func (p *plainProvider) GetBufferedAmount() uint64 { return 0 } + +func TestProviderSessionOpenVideoTrackUnsupported(t *testing.T) { + sess := &providerSession{provider: &plainProvider{}} + + caps := sess.Capabilities() + if !caps.ByteStream || caps.VideoTrack { + t.Fatalf("Capabilities() = %+v, want byte true and video false", caps) + } + + _, err := sess.OpenVideoTrack() + if !errors.Is(err, carrier.ErrVideoTrackUnsupported) { + t.Fatalf("OpenVideoTrack() error = %v, want %v", err, carrier.ErrVideoTrackUnsupported) + } +} + +func TestProviderByteStreamWrapsProviderAndCallbacks(t *testing.T) { + prov := &stubProvider{canSend: true} + stream := &providerByteStream{provider: prov} + + called := false + stream.SetReconnectCallback(func() { called = true }) + if prov.reconnectCallback == nil { + t.Fatal("SetReconnectCallback() did not install provider callback") + } + prov.reconnectCallback(nil) + if !called { + t.Fatal("reconnect callback was not adapted") + } + + reconnectAllowed := false + stream.SetShouldReconnect(func() bool { reconnectAllowed = true; return true }) + if prov.shouldReconnect == nil || !prov.shouldReconnect() || !reconnectAllowed { + t.Fatal("SetShouldReconnect() was not forwarded") + } + + ended := "" + stream.SetEndedCallback(func(reason string) { ended = reason }) + if prov.endedCallback == nil { + t.Fatal("SetEndedCallback() was not forwarded") + } + prov.endedCallback("bye") + if ended != "bye" { + t.Fatalf("ended callback reason = %q, want bye", ended) + } + + stream.WatchConnection(context.Background()) + if !prov.watchCalled { + t.Fatal("WatchConnection() was not forwarded") + } + if !stream.CanSend() { + t.Fatal("CanSend() = false, want true") + } +} + +func TestProviderByteStreamWrapsErrors(t *testing.T) { + prov := &stubProvider{ + connectErr: errors.New("connect boom"), + sendErr: errors.New("send boom"), + closeErr: errors.New("close boom"), + } + stream := &providerByteStream{provider: prov} + + if err := stream.Connect(context.Background()); err == nil || err.Error() != "connect: connect boom" { + t.Fatalf("Connect() error = %v", err) + } + if err := stream.Send([]byte("x")); err == nil || err.Error() != "send: send boom" { + t.Fatalf("Send() error = %v", err) + } + if err := stream.Close(); err == nil || err.Error() != "close: close boom" { + t.Fatalf("Close() error = %v", err) + } +} + +func TestProviderSessionOpenByteStreamAndVideoTrack(t *testing.T) { + prov := &stubProvider{canSend: true} + sess := &providerSession{provider: prov} + + stream, err := sess.OpenByteStream() + if err != nil { + t.Fatalf("OpenByteStream() error = %v", err) + } + if !stream.CanSend() { + t.Fatal("byte stream CanSend() = false, want true") + } + + video, err := sess.OpenVideoTrack() + if err != nil { + t.Fatalf("OpenVideoTrack() error = %v", err) + } + if err := video.Connect(context.Background()); err != nil { + t.Fatalf("video Connect() error = %v", err) + } + if err := video.Close(); err != nil { + t.Fatalf("video Close() error = %v", err) + } + video.SetShouldReconnect(func() bool { return true }) + video.SetEndedCallback(func(string) {}) + video.WatchConnection(context.Background()) + if !video.CanSend() || prov.shouldReconnect == nil || prov.endedCallback == nil || !prov.watchCalled { + t.Fatal("video adapter did not forward calls") + } +} + +func TestProviderVideoTrackWrapsOperations(t *testing.T) { + prov := &stubProvider{canSend: true, addTrackErr: errors.New("track boom")} + track := &providerVideoTrack{provider: prov} + + called := false + track.SetReconnectCallback(func() { called = true }) + prov.reconnectCallback(nil) + if !called { + t.Fatal("reconnect callback was not adapted") + } + + track.SetTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver) {}) + if !prov.trackHandlerCalled { + t.Fatal("SetTrackHandler() was not forwarded") + } + + if err := track.AddTrack(nil); err == nil || err.Error() != "add track: track boom" { + t.Fatalf("AddTrack() error = %v", err) + } +} + +func TestProviderVideoTrackWrapsConnectCloseErrors(t *testing.T) { + prov := &stubProvider{ + connectErr: errors.New("connect boom"), + closeErr: errors.New("close boom"), + } + track := &providerVideoTrack{provider: prov} + + if err := track.Connect(context.Background()); err == nil || err.Error() != "connect: connect boom" { + t.Fatalf("Connect() error = %v", err) + } + if err := track.Close(); err == nil || err.Error() != "close: close boom" { + t.Fatalf("Close() error = %v", err) + } +} diff --git a/internal/carrier/builtin/register.go b/internal/carrier/builtin/register.go index 6de5f15..7d955c6 100644 --- a/internal/carrier/builtin/register.go +++ b/internal/carrier/builtin/register.go @@ -2,15 +2,37 @@ package builtin import ( + "context" + "github.com/openlibrecommunity/olcrtc/internal/carrier" + "github.com/openlibrecommunity/olcrtc/internal/provider" "github.com/openlibrecommunity/olcrtc/internal/provider/jazz" "github.com/openlibrecommunity/olcrtc/internal/provider/telemost" "github.com/openlibrecommunity/olcrtc/internal/provider/wbstream" ) -// Register wires the built-in legacy carriers into the carrier registry. +type providerFactory func(context.Context, provider.Config) (provider.Provider, error) + +// Register wires the built-in carriers into the carrier registry. func Register() { - carrier.RegisterLegacy("jazz", jazz.New) - carrier.RegisterLegacy("telemost", telemost.New) - carrier.RegisterLegacy("wbstream", wbstream.New) + registerProvider("jazz", jazz.New) + registerProvider("telemost", telemost.New) + registerProvider("wbstream", wbstream.New) +} + +func registerProvider(name string, factory providerFactory) { + carrier.Register(name, func(ctx context.Context, cfg carrier.Config) (carrier.Session, error) { + prov, err := factory(ctx, provider.Config{ + RoomURL: cfg.RoomURL, + Name: cfg.Name, + OnData: cfg.OnData, + DNSServer: cfg.DNSServer, + ProxyAddr: cfg.ProxyAddr, + ProxyPort: cfg.ProxyPort, + }) + if err != nil { + return nil, err + } + return &providerSession{provider: prov}, nil + }) } diff --git a/internal/carrier/bytestream.go b/internal/carrier/bytestream.go index e394159..6803e03 100644 --- a/internal/carrier/bytestream.go +++ b/internal/carrier/bytestream.go @@ -2,9 +2,7 @@ package carrier import ( "context" - "fmt" - "github.com/openlibrecommunity/olcrtc/internal/provider" "github.com/pion/webrtc/v4" ) @@ -32,110 +30,3 @@ type VideoTrack interface { AddTrack(track webrtc.TrackLocal) error SetTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) } - -type videoTrackProvider interface { - provider.Provider - provider.VideoTrackCapable -} - -type legacySession struct { - provider provider.Provider -} - -// Capabilities reports the transport primitives supported by the legacy carrier. -func (s *legacySession) Capabilities() Capabilities { - caps := Capabilities{ByteStream: true} - _, caps.VideoTrack = s.provider.(videoTrackProvider) - return caps -} - -// OpenByteStream adapts the legacy provider to a generic byte stream capability. -func (s *legacySession) OpenByteStream() (ByteStream, error) { - return &legacyByteStream{provider: s.provider}, nil -} - -// OpenVideoTrack adapts a legacy provider to the generic video track capability. -func (s *legacySession) OpenVideoTrack() (VideoTrack, error) { - vtp, ok := s.provider.(videoTrackProvider) - if !ok { - return nil, ErrVideoTrackUnsupported - } - return &legacyVideoTrack{provider: vtp}, nil -} - -type legacyByteStream struct { - provider provider.Provider -} - -func (p *legacyByteStream) Connect(ctx context.Context) error { - if err := p.provider.Connect(ctx); err != nil { - return fmt.Errorf("connect: %w", err) - } - return nil -} -func (p *legacyByteStream) Send(data []byte) error { - if err := p.provider.Send(data); err != nil { - return fmt.Errorf("send: %w", err) - } - return nil -} -func (p *legacyByteStream) Close() error { - if err := p.provider.Close(); err != nil { - return fmt.Errorf("close: %w", err) - } - return nil -} - -func (p *legacyByteStream) SetReconnectCallback(cb func()) { - p.provider.SetReconnectCallback(func(_ *webrtc.DataChannel) { - if cb != nil { - cb() - } - }) -} - -func (p *legacyByteStream) SetShouldReconnect(fn func() bool) { p.provider.SetShouldReconnect(fn) } -func (p *legacyByteStream) SetEndedCallback(cb func(string)) { p.provider.SetEndedCallback(cb) } -func (p *legacyByteStream) WatchConnection(ctx context.Context) { - p.provider.WatchConnection(ctx) -} -func (p *legacyByteStream) CanSend() bool { return p.provider.CanSend() } - -type legacyVideoTrack struct { - provider videoTrackProvider -} - -func (v *legacyVideoTrack) Connect(ctx context.Context) error { - if err := v.provider.Connect(ctx); err != nil { - return fmt.Errorf("connect: %w", err) - } - return nil -} -func (v *legacyVideoTrack) Close() error { - if err := v.provider.Close(); err != nil { - return fmt.Errorf("close: %w", err) - } - return nil -} -func (v *legacyVideoTrack) SetShouldReconnect(fn func() bool) { v.provider.SetShouldReconnect(fn) } -func (v *legacyVideoTrack) SetEndedCallback(cb func(string)) { v.provider.SetEndedCallback(cb) } -func (v *legacyVideoTrack) WatchConnection(ctx context.Context) { - v.provider.WatchConnection(ctx) -} -func (v *legacyVideoTrack) CanSend() bool { return v.provider.CanSend() } -func (v *legacyVideoTrack) AddTrack(track webrtc.TrackLocal) error { - if err := v.provider.AddVideoTrack(track); err != nil { - return fmt.Errorf("add track: %w", err) - } - return nil -} -func (v *legacyVideoTrack) SetTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) { - v.provider.SetVideoTrackHandler(cb) -} -func (v *legacyVideoTrack) SetReconnectCallback(cb func()) { - v.provider.SetReconnectCallback(func(_ *webrtc.DataChannel) { - if cb != nil { - cb() - } - }) -} diff --git a/internal/carrier/carrier.go b/internal/carrier/carrier.go index ab98945..04bf68c 100644 --- a/internal/carrier/carrier.go +++ b/internal/carrier/carrier.go @@ -4,8 +4,6 @@ package carrier import ( "context" "errors" - - "github.com/openlibrecommunity/olcrtc/internal/provider" ) var ( @@ -59,24 +57,6 @@ func Register(name string, factory Factory) { registry[name] = factory } -// RegisterLegacy adapts an existing provider factory into the carrier registry. -func RegisterLegacy(name string, factory provider.Factory) { - Register(name, func(ctx context.Context, cfg Config) (Session, error) { - legacy, err := factory(ctx, provider.Config{ - RoomURL: cfg.RoomURL, - Name: cfg.Name, - OnData: cfg.OnData, - DNSServer: cfg.DNSServer, - ProxyAddr: cfg.ProxyAddr, - ProxyPort: cfg.ProxyPort, - }) - if err != nil { - return nil, err - } - return &legacySession{provider: legacy}, nil - }) -} - // New creates a carrier session by name. func New(ctx context.Context, name string, cfg Config) (Session, error) { factory, ok := registry[name] diff --git a/internal/carrier/carrier_test.go b/internal/carrier/carrier_test.go index 6299ba5..9244d4b 100644 --- a/internal/carrier/carrier_test.go +++ b/internal/carrier/carrier_test.go @@ -5,61 +5,14 @@ import ( "errors" "reflect" "testing" - - "github.com/openlibrecommunity/olcrtc/internal/provider" - "github.com/pion/webrtc/v4" ) -type stubProvider struct { - connectErr error - sendErr error - closeErr error - canSend bool - reconnectCallback func(*webrtc.DataChannel) - shouldReconnect func() bool - endedCallback func(string) - watchCalled bool - addTrackErr error - trackHandlerCalled bool -} +type stubSession struct{} -func (s *stubProvider) Connect(context.Context) error { return s.connectErr } -func (s *stubProvider) Send([]byte) error { return s.sendErr } -func (s *stubProvider) Close() error { return s.closeErr } -func (s *stubProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) { s.reconnectCallback = cb } -func (s *stubProvider) SetShouldReconnect(fn func() bool) { s.shouldReconnect = fn } -func (s *stubProvider) SetEndedCallback(cb func(string)) { s.endedCallback = cb } -func (s *stubProvider) WatchConnection(context.Context) { s.watchCalled = true } -func (s *stubProvider) CanSend() bool { return s.canSend } -func (s *stubProvider) GetSendQueue() chan []byte { return nil } -func (s *stubProvider) GetBufferedAmount() uint64 { return 0 } -func (s *stubProvider) AddVideoTrack(webrtc.TrackLocal) error { return s.addTrackErr } -func (s *stubProvider) SetVideoTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) { - s.trackHandlerCalled = true +func (s *stubSession) Capabilities() Capabilities { + return Capabilities{ByteStream: true, VideoTrack: true} } -type plainProvider struct { - connectErr error - sendErr error - closeErr error - canSend bool - reconnectCallback func(*webrtc.DataChannel) - shouldReconnect func() bool - endedCallback func(string) - watchCalled bool -} - -func (p *plainProvider) Connect(context.Context) error { return p.connectErr } -func (p *plainProvider) Send([]byte) error { return p.sendErr } -func (p *plainProvider) Close() error { return p.closeErr } -func (p *plainProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) { p.reconnectCallback = cb } -func (p *plainProvider) SetShouldReconnect(fn func() bool) { p.shouldReconnect = fn } -func (p *plainProvider) SetEndedCallback(cb func(string)) { p.endedCallback = cb } -func (p *plainProvider) WatchConnection(context.Context) { p.watchCalled = true } -func (p *plainProvider) CanSend() bool { return p.canSend } -func (p *plainProvider) GetSendQueue() chan []byte { return nil } -func (p *plainProvider) GetBufferedAmount() uint64 { return 0 } - func snapshotCarrierRegistry() map[string]Factory { out := make(map[string]Factory, len(registry)) for k, v := range registry { @@ -75,18 +28,18 @@ func restoreCarrierRegistry(src map[string]Factory) { } } -func TestRegisterLegacyAndAvailable(t *testing.T) { +func TestRegisterAndAvailable(t *testing.T) { old := snapshotCarrierRegistry() t.Cleanup(func() { restoreCarrierRegistry(old) }) - RegisterLegacy("legacy-test", func(_ context.Context, cfg provider.Config) (provider.Provider, error) { + Register("test-carrier", func(_ context.Context, cfg Config) (Session, error) { if cfg.Name != "peer" { - t.Fatalf("provider config name = %q, want peer", cfg.Name) + t.Fatalf("carrier config name = %q, want peer", cfg.Name) } - return &stubProvider{canSend: true}, nil + return &stubSession{}, nil }) - sess, err := New(context.Background(), "legacy-test", Config{Name: "peer"}) + sess, err := New(context.Background(), "test-carrier", Config{Name: "peer"}) if err != nil { t.Fatalf("New() error = %v", err) } @@ -96,8 +49,8 @@ func TestRegisterLegacyAndAvailable(t *testing.T) { t.Fatalf("Capabilities() = %+v, want byte and video true", caps) } - if !reflect.DeepEqual(Available(), []string{"legacy-test"}) { - t.Fatalf("Available() = %#v, want %#v", Available(), []string{"legacy-test"}) + if !reflect.DeepEqual(Available(), []string{"test-carrier"}) { + t.Fatalf("Available() = %#v, want %#v", Available(), []string{"test-carrier"}) } } @@ -111,141 +64,3 @@ func TestNewReturnsErrCarrierNotFound(t *testing.T) { t.Fatalf("New() error = %v, want %v", err, ErrCarrierNotFound) } } - -func TestLegacySessionOpenVideoTrackUnsupported(t *testing.T) { - sess := &legacySession{provider: &plainProvider{}} - - caps := sess.Capabilities() - if !caps.ByteStream || caps.VideoTrack { - t.Fatalf("Capabilities() = %+v, want byte true and video false", caps) - } - - _, err := sess.OpenVideoTrack() - if !errors.Is(err, ErrVideoTrackUnsupported) { - t.Fatalf("OpenVideoTrack() error = %v, want %v", err, ErrVideoTrackUnsupported) - } -} - -func TestLegacyByteStreamWrapsProviderAndCallbacks(t *testing.T) { - prov := &stubProvider{canSend: true} - stream := &legacyByteStream{provider: prov} - - called := false - stream.SetReconnectCallback(func() { called = true }) - if prov.reconnectCallback == nil { - t.Fatal("SetReconnectCallback() did not install provider callback") - } - prov.reconnectCallback(nil) - if !called { - t.Fatal("reconnect callback was not adapted") - } - - reconnectAllowed := false - stream.SetShouldReconnect(func() bool { reconnectAllowed = true; return true }) - if prov.shouldReconnect == nil || !prov.shouldReconnect() || !reconnectAllowed { - t.Fatal("SetShouldReconnect() was not forwarded") - } - - ended := "" - stream.SetEndedCallback(func(reason string) { ended = reason }) - if prov.endedCallback == nil { - t.Fatal("SetEndedCallback() was not forwarded") - } - prov.endedCallback("bye") - if ended != "bye" { - t.Fatalf("ended callback reason = %q, want bye", ended) - } - - stream.WatchConnection(context.Background()) - if !prov.watchCalled { - t.Fatal("WatchConnection() was not forwarded") - } - if !stream.CanSend() { - t.Fatal("CanSend() = false, want true") - } -} - -func TestLegacyByteStreamWrapsErrors(t *testing.T) { - prov := &stubProvider{ - connectErr: errors.New("connect boom"), - sendErr: errors.New("send boom"), - closeErr: errors.New("close boom"), - } - stream := &legacyByteStream{provider: prov} - - if err := stream.Connect(context.Background()); err == nil || err.Error() != "connect: connect boom" { - t.Fatalf("Connect() error = %v", err) - } - if err := stream.Send([]byte("x")); err == nil || err.Error() != "send: send boom" { - t.Fatalf("Send() error = %v", err) - } - if err := stream.Close(); err == nil || err.Error() != "close: close boom" { - t.Fatalf("Close() error = %v", err) - } -} - -func TestLegacySessionOpenByteStreamAndVideoTrack(t *testing.T) { - prov := &stubProvider{canSend: true} - sess := &legacySession{provider: prov} - - stream, err := sess.OpenByteStream() - if err != nil { - t.Fatalf("OpenByteStream() error = %v", err) - } - if !stream.CanSend() { - t.Fatal("byte stream CanSend() = false, want true") - } - - video, err := sess.OpenVideoTrack() - if err != nil { - t.Fatalf("OpenVideoTrack() error = %v", err) - } - if err := video.Connect(context.Background()); err != nil { - t.Fatalf("video Connect() error = %v", err) - } - if err := video.Close(); err != nil { - t.Fatalf("video Close() error = %v", err) - } - video.SetShouldReconnect(func() bool { return true }) - video.SetEndedCallback(func(string) {}) - video.WatchConnection(context.Background()) - if !video.CanSend() || prov.shouldReconnect == nil || prov.endedCallback == nil || !prov.watchCalled { - t.Fatal("video adapter did not forward calls") - } -} - -func TestLegacyVideoTrackWrapsOperations(t *testing.T) { - prov := &stubProvider{canSend: true, addTrackErr: errors.New("track boom")} - track := &legacyVideoTrack{provider: prov} - - called := false - track.SetReconnectCallback(func() { called = true }) - prov.reconnectCallback(nil) - if !called { - t.Fatal("reconnect callback was not adapted") - } - - track.SetTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver) {}) - if !prov.trackHandlerCalled { - t.Fatal("SetTrackHandler() was not forwarded") - } - - if err := track.AddTrack(nil); err == nil || err.Error() != "add track: track boom" { - t.Fatalf("AddTrack() error = %v", err) - } -} - -func TestLegacyVideoTrackWrapsConnectCloseErrors(t *testing.T) { - prov := &stubProvider{ - connectErr: errors.New("connect boom"), - closeErr: errors.New("close boom"), - } - track := &legacyVideoTrack{provider: prov} - - if err := track.Connect(context.Background()); err == nil || err.Error() != "connect: connect boom" { - t.Fatalf("Connect() error = %v", err) - } - if err := track.Close(); err == nil || err.Error() != "close: close boom" { - t.Fatalf("Close() error = %v", err) - } -} diff --git a/internal/provider/provider.go b/internal/provider/provider.go index 031835d..600a90d 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -9,8 +9,6 @@ import ( ) var ( - // ErrProviderNotFound is returned when a requested provider is not registered. - ErrProviderNotFound = errors.New("provider not found") // ErrDataChannelTimeout is returned when the DataChannel fails to open within the timeout period. ErrDataChannelTimeout = errors.New("datachannel timeout") // ErrDataChannelNotReady is returned when attempting to send data before the DataChannel is open. @@ -50,34 +48,3 @@ type Config struct { ProxyAddr string ProxyPort int } - -// Factory is a function that creates a new Provider instance. -type Factory func(ctx context.Context, cfg Config) (Provider, error) - -// registry holds all registered provider factories. -// -//nolint:gochecknoglobals // Global registry is required for provider discovery. -var registry = make(map[string]Factory) - -// Register adds a new provider factory to the registry. -func Register(name string, factory Factory) { - registry[name] = factory -} - -// New creates a new Provider instance by name. -func New(ctx context.Context, name string, cfg Config) (Provider, error) { - factory, ok := registry[name] - if !ok { - return nil, ErrProviderNotFound - } - return factory(ctx, cfg) -} - -// Available returns a list of registered provider names. -func Available() []string { - names := make([]string, 0, len(registry)) - for name := range registry { - names = append(names, name) - } - return names -} diff --git a/internal/provider/provider_test.go b/internal/provider/provider_test.go deleted file mode 100644 index 3e080cb..0000000 --- a/internal/provider/provider_test.go +++ /dev/null @@ -1,75 +0,0 @@ -package provider - -import ( - "context" - "errors" - "reflect" - "testing" - - "github.com/pion/webrtc/v4" -) - -type stubProvider struct{} - -func (s *stubProvider) Connect(context.Context) error { return nil } -func (s *stubProvider) Send([]byte) error { return nil } -func (s *stubProvider) Close() error { return nil } -func (s *stubProvider) SetReconnectCallback(func(*webrtc.DataChannel)) {} -func (s *stubProvider) SetShouldReconnect(func() bool) {} -func (s *stubProvider) SetEndedCallback(func(string)) {} -func (s *stubProvider) WatchConnection(context.Context) {} -func (s *stubProvider) CanSend() bool { return true } -func (s *stubProvider) GetSendQueue() chan []byte { return nil } -func (s *stubProvider) GetBufferedAmount() uint64 { return 0 } - -func snapshotProviderRegistry() map[string]Factory { - out := make(map[string]Factory, len(registry)) - for k, v := range registry { - out[k] = v - } - return out -} - -func restoreProviderRegistry(src map[string]Factory) { - registry = make(map[string]Factory, len(src)) - for k, v := range src { - registry[k] = v - } -} - -func TestNewAndAvailable(t *testing.T) { - old := snapshotProviderRegistry() - t.Cleanup(func() { restoreProviderRegistry(old) }) - - called := false - Register("test-provider", func(_ context.Context, cfg Config) (Provider, error) { - called = cfg.Name == "peer" - return &stubProvider{}, nil - }) - - got, err := New(context.Background(), "test-provider", Config{Name: "peer"}) - if err != nil { - t.Fatalf("New() error = %v", err) - } - if !called { - t.Fatal("factory did not receive config") - } - if _, ok := got.(*stubProvider); !ok { - t.Fatalf("New() returned %T, want *stubProvider", got) - } - - if !reflect.DeepEqual(Available(), []string{"test-provider"}) { - t.Fatalf("Available() = %#v, want %#v", Available(), []string{"test-provider"}) - } -} - -func TestNewReturnsErrProviderNotFound(t *testing.T) { - old := snapshotProviderRegistry() - t.Cleanup(func() { restoreProviderRegistry(old) }) - registry = map[string]Factory{} - - _, err := New(context.Background(), "missing", Config{}) - if !errors.Is(err, ErrProviderNotFound) { - t.Fatalf("New() error = %v, want %v", err, ErrProviderNotFound) - } -} diff --git a/internal/transport/datachannel/transport.go b/internal/transport/datachannel/transport.go index 965bb04..b361b3b 100644 --- a/internal/transport/datachannel/transport.go +++ b/internal/transport/datachannel/transport.go @@ -1,4 +1,4 @@ -// Package datachannel provides a transport backed by the current WebRTC providers. +// Package datachannel provides a transport backed by the current carriers. package datachannel import ( @@ -15,7 +15,7 @@ type streamTransport struct { stream carrier.ByteStream } -// New creates a datachannel transport backed by a carrier-specific provider. +// New creates a datachannel transport backed by a carrier. func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) { session, err := carrier.New(ctx, cfg.Carrier, carrier.Config{ RoomURL: cfg.RoomURL, @@ -26,7 +26,7 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) ProxyPort: cfg.ProxyPort, }) if err != nil { - return nil, fmt.Errorf("create provider transport: %w", err) + return nil, fmt.Errorf("create carrier transport: %w", err) } streamCapable, ok := session.(carrier.ByteStreamCapable) diff --git a/internal/transport/datachannel/transport_test.go b/internal/transport/datachannel/transport_test.go index b5a33ea..68d9efa 100644 --- a/internal/transport/datachannel/transport_test.go +++ b/internal/transport/datachannel/transport_test.go @@ -101,7 +101,7 @@ func TestNewErrorPaths(t *testing.T) { carrier.Register("datachannel-fail-create", func(context.Context, carrier.Config) (carrier.Session, error) { return nil, errors.New("boom") }) - if _, err := New(context.Background(), transport.Config{Carrier: "datachannel-fail-create"}); err == nil || err.Error() != "create provider transport: boom" { + if _, err := New(context.Background(), transport.Config{Carrier: "datachannel-fail-create"}); err == nil || err.Error() != "create carrier transport: boom" { t.Fatalf("New() error = %v", err) } diff --git a/internal/transport/seichannel/transport.go b/internal/transport/seichannel/transport.go index 4d7b521..bf026bc 100644 --- a/internal/transport/seichannel/transport.go +++ b/internal/transport/seichannel/transport.go @@ -97,7 +97,7 @@ type streamTransport struct { batchSize int } -// New creates a seichannel transport backed by a carrier-specific provider. +// New creates a seichannel transport backed by a carrier. func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) { session, err := carrier.New(ctx, cfg.Carrier, carrier.Config{ RoomURL: cfg.RoomURL, @@ -108,7 +108,7 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) ProxyPort: cfg.ProxyPort, }) if err != nil { - return nil, fmt.Errorf("create provider transport: %w", err) + return nil, fmt.Errorf("create carrier transport: %w", err) } videoCapable, ok := session.(carrier.VideoTrackCapable) diff --git a/internal/transport/seichannel/transport_unit_test.go b/internal/transport/seichannel/transport_unit_test.go index f7e026e..9ea68f2 100644 --- a/internal/transport/seichannel/transport_unit_test.go +++ b/internal/transport/seichannel/transport_unit_test.go @@ -114,7 +114,7 @@ func TestNewErrorPaths(t *testing.T) { carrier.Register("seichannel-create-fails", func(context.Context, carrier.Config) (carrier.Session, error) { return nil, errors.New("boom") }) - if _, err := New(context.Background(), transport.Config{Carrier: "seichannel-create-fails"}); err == nil || err.Error() != "create provider transport: boom" { + if _, err := New(context.Background(), transport.Config{Carrier: "seichannel-create-fails"}); err == nil || err.Error() != "create carrier transport: boom" { t.Fatalf("New() error = %v", err) } diff --git a/internal/transport/videochannel/transport.go b/internal/transport/videochannel/transport.go index 7d367c9..dc91c77 100644 --- a/internal/transport/videochannel/transport.go +++ b/internal/transport/videochannel/transport.go @@ -76,7 +76,7 @@ type streamTransport struct { idleFrameMu sync.Mutex } -// New creates a visual videochannel transport backed by a carrier-specific provider. +// New creates a visual videochannel transport backed by a carrier. func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) { session, err := carrier.New(ctx, cfg.Carrier, carrier.Config{ RoomURL: cfg.RoomURL, @@ -87,7 +87,7 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) ProxyPort: cfg.ProxyPort, }) if err != nil { - return nil, fmt.Errorf("create provider transport: %w", err) + return nil, fmt.Errorf("create carrier transport: %w", err) } videoCapable, ok := session.(carrier.VideoTrackCapable) diff --git a/internal/transport/videochannel/transport_unit_test.go b/internal/transport/videochannel/transport_unit_test.go index b729b05..3a257a3 100644 --- a/internal/transport/videochannel/transport_unit_test.go +++ b/internal/transport/videochannel/transport_unit_test.go @@ -104,7 +104,7 @@ func TestNewErrorPaths(t *testing.T) { carrier.Register("videochannel-create-fails", func(context.Context, carrier.Config) (carrier.Session, error) { return nil, errors.New("boom") }) - if _, err := New(context.Background(), transport.Config{Carrier: "videochannel-create-fails"}); err == nil || err.Error() != "create provider transport: boom" { + if _, err := New(context.Background(), transport.Config{Carrier: "videochannel-create-fails"}); err == nil || err.Error() != "create carrier transport: boom" { t.Fatalf("New() error = %v", err) } diff --git a/internal/transport/vp8channel/transport.go b/internal/transport/vp8channel/transport.go index 08b95ee..f0c314c 100644 --- a/internal/transport/vp8channel/transport.go +++ b/internal/transport/vp8channel/transport.go @@ -93,7 +93,7 @@ type streamTransport struct { reconnectFn func() } -// New creates a vp8channel transport backed by a carrier-specific provider. +// New creates a vp8channel transport backed by a carrier. func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) { session, err := carrier.New(ctx, cfg.Carrier, carrier.Config{ RoomURL: cfg.RoomURL, @@ -104,7 +104,7 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) ProxyPort: cfg.ProxyPort, }) if err != nil { - return nil, fmt.Errorf("create provider transport: %w", err) + return nil, fmt.Errorf("create carrier transport: %w", err) } videoCapable, ok := session.(carrier.VideoTrackCapable) diff --git a/internal/transport/vp8channel/transport_unit_test.go b/internal/transport/vp8channel/transport_unit_test.go index 5c79c18..0c8fe01 100644 --- a/internal/transport/vp8channel/transport_unit_test.go +++ b/internal/transport/vp8channel/transport_unit_test.go @@ -131,7 +131,7 @@ func TestNewErrorPaths(t *testing.T) { carrier.Register("vp8channel-create-fails", func(context.Context, carrier.Config) (carrier.Session, error) { return nil, errors.New("boom") }) - if _, err := New(context.Background(), transport.Config{Carrier: "vp8channel-create-fails"}); err == nil || err.Error() != "create provider transport: boom" { + if _, err := New(context.Background(), transport.Config{Carrier: "vp8channel-create-fails"}); err == nil || err.Error() != "create carrier transport: boom" { t.Fatalf("New() error = %v", err) } diff --git a/mobile/mobile.go b/mobile/mobile.go index bad7757..ba5f766 100644 --- a/mobile/mobile.go +++ b/mobile/mobile.go @@ -142,7 +142,7 @@ func SetDebug(enabled bool) { } // Start launches the olcRTC client in background. -// carrierName: carrier/provider name ("telemost", "jazz", "wbstream", "wbstream") +// carrierName: carrier name ("telemost", "jazz", "wbstream") // roomID: carrier-specific room ID // clientID: client identifier that must match the server's -client-id // keyHex: 64-char hex encryption key diff --git a/script/docker/olcrtc-entrypoint.sh b/script/docker/olcrtc-entrypoint.sh index f8c971e..eb55bb7 100644 --- a/script/docker/olcrtc-entrypoint.sh +++ b/script/docker/olcrtc-entrypoint.sh @@ -30,14 +30,14 @@ if [ "$#" -gt 0 ]; then fi mode="${OLCRTC_MODE:-srv}" -room_id="${OLCRTC_ROOM_ID:-${ROOM_ID:-}}" -carrier="${OLCRTC_CARRIER:-${OLCRTC_PROVIDER:-}}" +room_id="${OLCRTC_ROOM_ID:-}" +carrier="${OLCRTC_CARRIER:-}" transport="${OLCRTC_TRANSPORT:-}" link="${OLCRTC_LINK:-direct}" data_dir="${OLCRTC_DATA_DIR:-/usr/share/olcrtc}" dns_server="${OLCRTC_DNS:-1.1.1.1:53}" -key="${OLCRTC_KEY:-${KEY:-}}" -client_id="${OLCRTC_CLIENT_ID:-${CLIENT_ID:-}}" +key="${OLCRTC_KEY:-}" +client_id="${OLCRTC_CLIENT_ID:-}" key_file="${OLCRTC_KEY_FILE:-/var/lib/olcrtc/key.hex}" socks_proxy="${OLCRTC_SOCKS_PROXY:-}" socks_proxy_port="${OLCRTC_SOCKS_PROXY_PORT:-1080}"