refactor: remove legacy code

This commit is contained in:
zarazaex69
2026-05-07 14:28:32 +03:00
parent 99ee4d8bbc
commit 9c992d6fe4
24 changed files with 380 additions and 501 deletions

View File

@@ -42,7 +42,7 @@ USER olcrtc:olcrtc
WORKDIR /var/lib/olcrtc
ENV OLCRTC_MODE=srv \
OLCRTC_PROVIDER= \
OLCRTC_CARRIER= \
OLCRTC_DATA_DIR=/usr/share/olcrtc \
OLCRTC_DNS=1.1.1.1:53 \
OLCRTC_KEY_FILE=/var/lib/olcrtc/key.hex

View File

@@ -33,7 +33,6 @@ type config struct {
carrier string
roomID string
clientID string
provider string
socksPort int
socksHost string
keyHex string
@@ -140,7 +139,6 @@ func parseFlagsFrom(args []string, errorHandling flag.ErrorHandling) (config, er
fs.StringVar(&cfg.carrier, "carrier", "", "Carrier: telemost, jazz, wbstream")
fs.StringVar(&cfg.roomID, "id", "", "Room ID")
fs.StringVar(&cfg.clientID, "client-id", "", "Client ID: binds one srv to one cnc (required)")
fs.StringVar(&cfg.provider, "provider", "", "Deprecated alias for -carrier")
fs.IntVar(&cfg.socksPort, "socks-port", 0, "SOCKS5 port (client only)")
fs.StringVar(&cfg.socksHost, "socks-host", "", "SOCKS5 listen host (client only)")
fs.StringVar(&cfg.keyHex, "key", "", "Shared encryption key (hex)")
@@ -210,7 +208,7 @@ func toSessionConfig(cfg config) session.Config {
Mode: cfg.mode,
Link: cfg.link,
Transport: cfg.transport,
Carrier: firstNonEmpty(cfg.carrier, cfg.provider),
Carrier: cfg.carrier,
RoomID: cfg.roomID,
ClientID: cfg.clientID,
KeyHex: cfg.keyHex,
@@ -238,15 +236,6 @@ func toSessionConfig(cfg config) session.Config {
}
}
func firstNonEmpty(values ...string) string {
for _, value := range values {
if value != "" {
return value
}
}
return ""
}
func waitForShutdown(errCh <-chan error) error {
done := make(chan error, 1)
go func() {

View File

@@ -12,12 +12,12 @@ import (
"github.com/openlibrecommunity/olcrtc/internal/logger"
)
func TestToSessionConfigAndFirstNonEmpty(t *testing.T) {
func TestToSessionConfig(t *testing.T) {
cfg := config{
mode: "cnc",
link: "direct",
transport: "vp8channel",
provider: "jazz",
carrier: "jazz",
roomID: "room",
clientID: "client",
keyHex: "key",
@@ -52,18 +52,6 @@ func TestToSessionConfigAndFirstNonEmpty(t *testing.T) {
t.Fatalf("toSessionConfig() = %+v", got)
}
cfg.carrier = "telemost"
got = toSessionConfig(cfg)
if got.Carrier != "telemost" {
t.Fatalf("carrier precedence = %q, want telemost", got.Carrier)
}
if got := firstNonEmpty("", "", "x", "y"); got != "x" {
t.Fatalf("firstNonEmpty() = %q, want x", got)
}
if got := firstNonEmpty("", ""); got != "" {
t.Fatalf("firstNonEmpty(empty) = %q, want empty", got)
}
}
func TestParseFlagsFrom(t *testing.T) {
@@ -249,20 +237,3 @@ func TestWaitForShutdown(t *testing.T) {
t.Fatalf("waitForShutdown(error) = %v, want %v", err, want)
}
}
func TestValidateConfigAliasStillValidates(t *testing.T) {
session.RegisterDefaults()
cfg := config{
mode: "srv",
link: "direct",
transport: "datachannel",
provider: "jazz",
clientID: "client",
keyHex: "key",
dnsServer: "1.1.1.1:53",
videoCodec: "qrcode",
}
if err := session.Validate(toSessionConfig(cfg)); err != nil {
t.Fatalf("Validate(toSessionConfig(alias)) error = %v", err)
}
}

View File

@@ -6,7 +6,7 @@ services:
container_name: olcrtc-server
restart: unless-stopped
environment:
OLCRTC_PROVIDER: "${OLCRTC_PROVIDER:?set OLCRTC_PROVIDER (telemost, jazz, wbstream)}"
OLCRTC_CARRIER: "${OLCRTC_CARRIER:?set OLCRTC_CARRIER (telemost, jazz, wbstream)}"
OLCRTC_ROOM_ID: "${OLCRTC_ROOM_ID:?set OLCRTC_ROOM_ID}"
OLCRTC_KEY: "${OLCRTC_KEY:-}"
OLCRTC_DNS: "${OLCRTC_DNS:-1.1.1.1:53}"

View File

@@ -30,7 +30,7 @@ olcrtc://<Carrier>?<Transport>@<RoomID>#<EncryptionKey>%<ClientID>$<MIMO>
| Поле | Значение |
|------|----------|
| `<Carrier>` | Имя carrier/provider, например `telemost`, `jazz`, `wbstream` |
| `<Carrier>` | Имя carrier, например `telemost`, `jazz`, `wbstream` |
| `<Transport>` | Имя транспорта, например `datachannel`, `vp8channel`, `seichannel`, `videochannel` |
| `<RoomID>` | Идентификатор комнаты или carrier-specific room URL/ID |
| `<EncryptionKey>` | Ключ шифрования в hex, обычно 64 символа (`32` байта) |

View File

@@ -121,7 +121,7 @@ type Config struct {
SEIAckTimeoutMS int
}
// RegisterDefaults registers built-in providers and transports.
// RegisterDefaults registers built-in carriers and transports.
func RegisterDefaults() {
builtin.Register()
link.Register("direct", direct.New)

View File

@@ -0,0 +1,121 @@
package builtin
import (
"context"
"fmt"
"github.com/openlibrecommunity/olcrtc/internal/carrier"
"github.com/openlibrecommunity/olcrtc/internal/provider"
"github.com/pion/webrtc/v4"
)
type providerSession struct {
provider provider.Provider
}
func (s *providerSession) Capabilities() carrier.Capabilities {
caps := carrier.Capabilities{ByteStream: true}
_, caps.VideoTrack = s.provider.(videoTrackProvider)
return caps
}
func (s *providerSession) OpenByteStream() (carrier.ByteStream, error) {
return &providerByteStream{provider: s.provider}, nil
}
func (s *providerSession) OpenVideoTrack() (carrier.VideoTrack, error) {
vtp, ok := s.provider.(videoTrackProvider)
if !ok {
return nil, carrier.ErrVideoTrackUnsupported
}
return &providerVideoTrack{provider: vtp}, nil
}
type videoTrackProvider interface {
provider.Provider
provider.VideoTrackCapable
}
type providerByteStream struct {
provider provider.Provider
}
func (p *providerByteStream) Connect(ctx context.Context) error {
if err := p.provider.Connect(ctx); err != nil {
return fmt.Errorf("connect: %w", err)
}
return nil
}
func (p *providerByteStream) Send(data []byte) error {
if err := p.provider.Send(data); err != nil {
return fmt.Errorf("send: %w", err)
}
return nil
}
func (p *providerByteStream) Close() error {
if err := p.provider.Close(); err != nil {
return fmt.Errorf("close: %w", err)
}
return nil
}
func (p *providerByteStream) SetReconnectCallback(cb func()) {
p.provider.SetReconnectCallback(func(_ *webrtc.DataChannel) {
if cb != nil {
cb()
}
})
}
func (p *providerByteStream) SetShouldReconnect(fn func() bool) { p.provider.SetShouldReconnect(fn) }
func (p *providerByteStream) SetEndedCallback(cb func(string)) { p.provider.SetEndedCallback(cb) }
func (p *providerByteStream) WatchConnection(ctx context.Context) {
p.provider.WatchConnection(ctx)
}
func (p *providerByteStream) CanSend() bool { return p.provider.CanSend() }
type providerVideoTrack struct {
provider videoTrackProvider
}
func (v *providerVideoTrack) Connect(ctx context.Context) error {
if err := v.provider.Connect(ctx); err != nil {
return fmt.Errorf("connect: %w", err)
}
return nil
}
func (v *providerVideoTrack) Close() error {
if err := v.provider.Close(); err != nil {
return fmt.Errorf("close: %w", err)
}
return nil
}
func (v *providerVideoTrack) SetReconnectCallback(cb func()) {
v.provider.SetReconnectCallback(func(_ *webrtc.DataChannel) {
if cb != nil {
cb()
}
})
}
func (v *providerVideoTrack) SetShouldReconnect(fn func() bool) { v.provider.SetShouldReconnect(fn) }
func (v *providerVideoTrack) SetEndedCallback(cb func(string)) { v.provider.SetEndedCallback(cb) }
func (v *providerVideoTrack) WatchConnection(ctx context.Context) {
v.provider.WatchConnection(ctx)
}
func (v *providerVideoTrack) CanSend() bool { return v.provider.CanSend() }
func (v *providerVideoTrack) AddTrack(track webrtc.TrackLocal) error {
if err := v.provider.AddVideoTrack(track); err != nil {
return fmt.Errorf("add track: %w", err)
}
return nil
}
func (v *providerVideoTrack) SetTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) {
v.provider.SetVideoTrackHandler(cb)
}

View File

@@ -0,0 +1,198 @@
package builtin
import (
"context"
"errors"
"testing"
"github.com/openlibrecommunity/olcrtc/internal/carrier"
"github.com/pion/webrtc/v4"
)
type stubProvider struct {
connectErr error
sendErr error
closeErr error
canSend bool
reconnectCallback func(*webrtc.DataChannel)
shouldReconnect func() bool
endedCallback func(string)
watchCalled bool
addTrackErr error
trackHandlerCalled bool
}
func (s *stubProvider) Connect(context.Context) error { return s.connectErr }
func (s *stubProvider) Send([]byte) error { return s.sendErr }
func (s *stubProvider) Close() error { return s.closeErr }
func (s *stubProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) { s.reconnectCallback = cb }
func (s *stubProvider) SetShouldReconnect(fn func() bool) { s.shouldReconnect = fn }
func (s *stubProvider) SetEndedCallback(cb func(string)) { s.endedCallback = cb }
func (s *stubProvider) WatchConnection(context.Context) { s.watchCalled = true }
func (s *stubProvider) CanSend() bool { return s.canSend }
func (s *stubProvider) GetSendQueue() chan []byte { return nil }
func (s *stubProvider) GetBufferedAmount() uint64 { return 0 }
func (s *stubProvider) AddVideoTrack(webrtc.TrackLocal) error { return s.addTrackErr }
func (s *stubProvider) SetVideoTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) {
s.trackHandlerCalled = true
}
type plainProvider struct {
connectErr error
sendErr error
closeErr error
canSend bool
reconnectCallback func(*webrtc.DataChannel)
shouldReconnect func() bool
endedCallback func(string)
watchCalled bool
}
func (p *plainProvider) Connect(context.Context) error { return p.connectErr }
func (p *plainProvider) Send([]byte) error { return p.sendErr }
func (p *plainProvider) Close() error { return p.closeErr }
func (p *plainProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) { p.reconnectCallback = cb }
func (p *plainProvider) SetShouldReconnect(fn func() bool) { p.shouldReconnect = fn }
func (p *plainProvider) SetEndedCallback(cb func(string)) { p.endedCallback = cb }
func (p *plainProvider) WatchConnection(context.Context) { p.watchCalled = true }
func (p *plainProvider) CanSend() bool { return p.canSend }
func (p *plainProvider) GetSendQueue() chan []byte { return nil }
func (p *plainProvider) GetBufferedAmount() uint64 { return 0 }
func TestProviderSessionOpenVideoTrackUnsupported(t *testing.T) {
sess := &providerSession{provider: &plainProvider{}}
caps := sess.Capabilities()
if !caps.ByteStream || caps.VideoTrack {
t.Fatalf("Capabilities() = %+v, want byte true and video false", caps)
}
_, err := sess.OpenVideoTrack()
if !errors.Is(err, carrier.ErrVideoTrackUnsupported) {
t.Fatalf("OpenVideoTrack() error = %v, want %v", err, carrier.ErrVideoTrackUnsupported)
}
}
func TestProviderByteStreamWrapsProviderAndCallbacks(t *testing.T) {
prov := &stubProvider{canSend: true}
stream := &providerByteStream{provider: prov}
called := false
stream.SetReconnectCallback(func() { called = true })
if prov.reconnectCallback == nil {
t.Fatal("SetReconnectCallback() did not install provider callback")
}
prov.reconnectCallback(nil)
if !called {
t.Fatal("reconnect callback was not adapted")
}
reconnectAllowed := false
stream.SetShouldReconnect(func() bool { reconnectAllowed = true; return true })
if prov.shouldReconnect == nil || !prov.shouldReconnect() || !reconnectAllowed {
t.Fatal("SetShouldReconnect() was not forwarded")
}
ended := ""
stream.SetEndedCallback(func(reason string) { ended = reason })
if prov.endedCallback == nil {
t.Fatal("SetEndedCallback() was not forwarded")
}
prov.endedCallback("bye")
if ended != "bye" {
t.Fatalf("ended callback reason = %q, want bye", ended)
}
stream.WatchConnection(context.Background())
if !prov.watchCalled {
t.Fatal("WatchConnection() was not forwarded")
}
if !stream.CanSend() {
t.Fatal("CanSend() = false, want true")
}
}
func TestProviderByteStreamWrapsErrors(t *testing.T) {
prov := &stubProvider{
connectErr: errors.New("connect boom"),
sendErr: errors.New("send boom"),
closeErr: errors.New("close boom"),
}
stream := &providerByteStream{provider: prov}
if err := stream.Connect(context.Background()); err == nil || err.Error() != "connect: connect boom" {
t.Fatalf("Connect() error = %v", err)
}
if err := stream.Send([]byte("x")); err == nil || err.Error() != "send: send boom" {
t.Fatalf("Send() error = %v", err)
}
if err := stream.Close(); err == nil || err.Error() != "close: close boom" {
t.Fatalf("Close() error = %v", err)
}
}
func TestProviderSessionOpenByteStreamAndVideoTrack(t *testing.T) {
prov := &stubProvider{canSend: true}
sess := &providerSession{provider: prov}
stream, err := sess.OpenByteStream()
if err != nil {
t.Fatalf("OpenByteStream() error = %v", err)
}
if !stream.CanSend() {
t.Fatal("byte stream CanSend() = false, want true")
}
video, err := sess.OpenVideoTrack()
if err != nil {
t.Fatalf("OpenVideoTrack() error = %v", err)
}
if err := video.Connect(context.Background()); err != nil {
t.Fatalf("video Connect() error = %v", err)
}
if err := video.Close(); err != nil {
t.Fatalf("video Close() error = %v", err)
}
video.SetShouldReconnect(func() bool { return true })
video.SetEndedCallback(func(string) {})
video.WatchConnection(context.Background())
if !video.CanSend() || prov.shouldReconnect == nil || prov.endedCallback == nil || !prov.watchCalled {
t.Fatal("video adapter did not forward calls")
}
}
func TestProviderVideoTrackWrapsOperations(t *testing.T) {
prov := &stubProvider{canSend: true, addTrackErr: errors.New("track boom")}
track := &providerVideoTrack{provider: prov}
called := false
track.SetReconnectCallback(func() { called = true })
prov.reconnectCallback(nil)
if !called {
t.Fatal("reconnect callback was not adapted")
}
track.SetTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver) {})
if !prov.trackHandlerCalled {
t.Fatal("SetTrackHandler() was not forwarded")
}
if err := track.AddTrack(nil); err == nil || err.Error() != "add track: track boom" {
t.Fatalf("AddTrack() error = %v", err)
}
}
func TestProviderVideoTrackWrapsConnectCloseErrors(t *testing.T) {
prov := &stubProvider{
connectErr: errors.New("connect boom"),
closeErr: errors.New("close boom"),
}
track := &providerVideoTrack{provider: prov}
if err := track.Connect(context.Background()); err == nil || err.Error() != "connect: connect boom" {
t.Fatalf("Connect() error = %v", err)
}
if err := track.Close(); err == nil || err.Error() != "close: close boom" {
t.Fatalf("Close() error = %v", err)
}
}

View File

@@ -2,15 +2,37 @@
package builtin
import (
"context"
"github.com/openlibrecommunity/olcrtc/internal/carrier"
"github.com/openlibrecommunity/olcrtc/internal/provider"
"github.com/openlibrecommunity/olcrtc/internal/provider/jazz"
"github.com/openlibrecommunity/olcrtc/internal/provider/telemost"
"github.com/openlibrecommunity/olcrtc/internal/provider/wbstream"
)
// Register wires the built-in legacy carriers into the carrier registry.
type providerFactory func(context.Context, provider.Config) (provider.Provider, error)
// Register wires the built-in carriers into the carrier registry.
func Register() {
carrier.RegisterLegacy("jazz", jazz.New)
carrier.RegisterLegacy("telemost", telemost.New)
carrier.RegisterLegacy("wbstream", wbstream.New)
registerProvider("jazz", jazz.New)
registerProvider("telemost", telemost.New)
registerProvider("wbstream", wbstream.New)
}
func registerProvider(name string, factory providerFactory) {
carrier.Register(name, func(ctx context.Context, cfg carrier.Config) (carrier.Session, error) {
prov, err := factory(ctx, provider.Config{
RoomURL: cfg.RoomURL,
Name: cfg.Name,
OnData: cfg.OnData,
DNSServer: cfg.DNSServer,
ProxyAddr: cfg.ProxyAddr,
ProxyPort: cfg.ProxyPort,
})
if err != nil {
return nil, err
}
return &providerSession{provider: prov}, nil
})
}

View File

@@ -2,9 +2,7 @@ package carrier
import (
"context"
"fmt"
"github.com/openlibrecommunity/olcrtc/internal/provider"
"github.com/pion/webrtc/v4"
)
@@ -32,110 +30,3 @@ type VideoTrack interface {
AddTrack(track webrtc.TrackLocal) error
SetTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver))
}
type videoTrackProvider interface {
provider.Provider
provider.VideoTrackCapable
}
type legacySession struct {
provider provider.Provider
}
// Capabilities reports the transport primitives supported by the legacy carrier.
func (s *legacySession) Capabilities() Capabilities {
caps := Capabilities{ByteStream: true}
_, caps.VideoTrack = s.provider.(videoTrackProvider)
return caps
}
// OpenByteStream adapts the legacy provider to a generic byte stream capability.
func (s *legacySession) OpenByteStream() (ByteStream, error) {
return &legacyByteStream{provider: s.provider}, nil
}
// OpenVideoTrack adapts a legacy provider to the generic video track capability.
func (s *legacySession) OpenVideoTrack() (VideoTrack, error) {
vtp, ok := s.provider.(videoTrackProvider)
if !ok {
return nil, ErrVideoTrackUnsupported
}
return &legacyVideoTrack{provider: vtp}, nil
}
type legacyByteStream struct {
provider provider.Provider
}
func (p *legacyByteStream) Connect(ctx context.Context) error {
if err := p.provider.Connect(ctx); err != nil {
return fmt.Errorf("connect: %w", err)
}
return nil
}
func (p *legacyByteStream) Send(data []byte) error {
if err := p.provider.Send(data); err != nil {
return fmt.Errorf("send: %w", err)
}
return nil
}
func (p *legacyByteStream) Close() error {
if err := p.provider.Close(); err != nil {
return fmt.Errorf("close: %w", err)
}
return nil
}
func (p *legacyByteStream) SetReconnectCallback(cb func()) {
p.provider.SetReconnectCallback(func(_ *webrtc.DataChannel) {
if cb != nil {
cb()
}
})
}
func (p *legacyByteStream) SetShouldReconnect(fn func() bool) { p.provider.SetShouldReconnect(fn) }
func (p *legacyByteStream) SetEndedCallback(cb func(string)) { p.provider.SetEndedCallback(cb) }
func (p *legacyByteStream) WatchConnection(ctx context.Context) {
p.provider.WatchConnection(ctx)
}
func (p *legacyByteStream) CanSend() bool { return p.provider.CanSend() }
type legacyVideoTrack struct {
provider videoTrackProvider
}
func (v *legacyVideoTrack) Connect(ctx context.Context) error {
if err := v.provider.Connect(ctx); err != nil {
return fmt.Errorf("connect: %w", err)
}
return nil
}
func (v *legacyVideoTrack) Close() error {
if err := v.provider.Close(); err != nil {
return fmt.Errorf("close: %w", err)
}
return nil
}
func (v *legacyVideoTrack) SetShouldReconnect(fn func() bool) { v.provider.SetShouldReconnect(fn) }
func (v *legacyVideoTrack) SetEndedCallback(cb func(string)) { v.provider.SetEndedCallback(cb) }
func (v *legacyVideoTrack) WatchConnection(ctx context.Context) {
v.provider.WatchConnection(ctx)
}
func (v *legacyVideoTrack) CanSend() bool { return v.provider.CanSend() }
func (v *legacyVideoTrack) AddTrack(track webrtc.TrackLocal) error {
if err := v.provider.AddVideoTrack(track); err != nil {
return fmt.Errorf("add track: %w", err)
}
return nil
}
func (v *legacyVideoTrack) SetTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) {
v.provider.SetVideoTrackHandler(cb)
}
func (v *legacyVideoTrack) SetReconnectCallback(cb func()) {
v.provider.SetReconnectCallback(func(_ *webrtc.DataChannel) {
if cb != nil {
cb()
}
})
}

View File

@@ -4,8 +4,6 @@ package carrier
import (
"context"
"errors"
"github.com/openlibrecommunity/olcrtc/internal/provider"
)
var (
@@ -59,24 +57,6 @@ func Register(name string, factory Factory) {
registry[name] = factory
}
// RegisterLegacy adapts an existing provider factory into the carrier registry.
func RegisterLegacy(name string, factory provider.Factory) {
Register(name, func(ctx context.Context, cfg Config) (Session, error) {
legacy, err := factory(ctx, provider.Config{
RoomURL: cfg.RoomURL,
Name: cfg.Name,
OnData: cfg.OnData,
DNSServer: cfg.DNSServer,
ProxyAddr: cfg.ProxyAddr,
ProxyPort: cfg.ProxyPort,
})
if err != nil {
return nil, err
}
return &legacySession{provider: legacy}, nil
})
}
// New creates a carrier session by name.
func New(ctx context.Context, name string, cfg Config) (Session, error) {
factory, ok := registry[name]

View File

@@ -5,61 +5,14 @@ import (
"errors"
"reflect"
"testing"
"github.com/openlibrecommunity/olcrtc/internal/provider"
"github.com/pion/webrtc/v4"
)
type stubProvider struct {
connectErr error
sendErr error
closeErr error
canSend bool
reconnectCallback func(*webrtc.DataChannel)
shouldReconnect func() bool
endedCallback func(string)
watchCalled bool
addTrackErr error
trackHandlerCalled bool
}
type stubSession struct{}
func (s *stubProvider) Connect(context.Context) error { return s.connectErr }
func (s *stubProvider) Send([]byte) error { return s.sendErr }
func (s *stubProvider) Close() error { return s.closeErr }
func (s *stubProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) { s.reconnectCallback = cb }
func (s *stubProvider) SetShouldReconnect(fn func() bool) { s.shouldReconnect = fn }
func (s *stubProvider) SetEndedCallback(cb func(string)) { s.endedCallback = cb }
func (s *stubProvider) WatchConnection(context.Context) { s.watchCalled = true }
func (s *stubProvider) CanSend() bool { return s.canSend }
func (s *stubProvider) GetSendQueue() chan []byte { return nil }
func (s *stubProvider) GetBufferedAmount() uint64 { return 0 }
func (s *stubProvider) AddVideoTrack(webrtc.TrackLocal) error { return s.addTrackErr }
func (s *stubProvider) SetVideoTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) {
s.trackHandlerCalled = true
func (s *stubSession) Capabilities() Capabilities {
return Capabilities{ByteStream: true, VideoTrack: true}
}
type plainProvider struct {
connectErr error
sendErr error
closeErr error
canSend bool
reconnectCallback func(*webrtc.DataChannel)
shouldReconnect func() bool
endedCallback func(string)
watchCalled bool
}
func (p *plainProvider) Connect(context.Context) error { return p.connectErr }
func (p *plainProvider) Send([]byte) error { return p.sendErr }
func (p *plainProvider) Close() error { return p.closeErr }
func (p *plainProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) { p.reconnectCallback = cb }
func (p *plainProvider) SetShouldReconnect(fn func() bool) { p.shouldReconnect = fn }
func (p *plainProvider) SetEndedCallback(cb func(string)) { p.endedCallback = cb }
func (p *plainProvider) WatchConnection(context.Context) { p.watchCalled = true }
func (p *plainProvider) CanSend() bool { return p.canSend }
func (p *plainProvider) GetSendQueue() chan []byte { return nil }
func (p *plainProvider) GetBufferedAmount() uint64 { return 0 }
func snapshotCarrierRegistry() map[string]Factory {
out := make(map[string]Factory, len(registry))
for k, v := range registry {
@@ -75,18 +28,18 @@ func restoreCarrierRegistry(src map[string]Factory) {
}
}
func TestRegisterLegacyAndAvailable(t *testing.T) {
func TestRegisterAndAvailable(t *testing.T) {
old := snapshotCarrierRegistry()
t.Cleanup(func() { restoreCarrierRegistry(old) })
RegisterLegacy("legacy-test", func(_ context.Context, cfg provider.Config) (provider.Provider, error) {
Register("test-carrier", func(_ context.Context, cfg Config) (Session, error) {
if cfg.Name != "peer" {
t.Fatalf("provider config name = %q, want peer", cfg.Name)
t.Fatalf("carrier config name = %q, want peer", cfg.Name)
}
return &stubProvider{canSend: true}, nil
return &stubSession{}, nil
})
sess, err := New(context.Background(), "legacy-test", Config{Name: "peer"})
sess, err := New(context.Background(), "test-carrier", Config{Name: "peer"})
if err != nil {
t.Fatalf("New() error = %v", err)
}
@@ -96,8 +49,8 @@ func TestRegisterLegacyAndAvailable(t *testing.T) {
t.Fatalf("Capabilities() = %+v, want byte and video true", caps)
}
if !reflect.DeepEqual(Available(), []string{"legacy-test"}) {
t.Fatalf("Available() = %#v, want %#v", Available(), []string{"legacy-test"})
if !reflect.DeepEqual(Available(), []string{"test-carrier"}) {
t.Fatalf("Available() = %#v, want %#v", Available(), []string{"test-carrier"})
}
}
@@ -111,141 +64,3 @@ func TestNewReturnsErrCarrierNotFound(t *testing.T) {
t.Fatalf("New() error = %v, want %v", err, ErrCarrierNotFound)
}
}
func TestLegacySessionOpenVideoTrackUnsupported(t *testing.T) {
sess := &legacySession{provider: &plainProvider{}}
caps := sess.Capabilities()
if !caps.ByteStream || caps.VideoTrack {
t.Fatalf("Capabilities() = %+v, want byte true and video false", caps)
}
_, err := sess.OpenVideoTrack()
if !errors.Is(err, ErrVideoTrackUnsupported) {
t.Fatalf("OpenVideoTrack() error = %v, want %v", err, ErrVideoTrackUnsupported)
}
}
func TestLegacyByteStreamWrapsProviderAndCallbacks(t *testing.T) {
prov := &stubProvider{canSend: true}
stream := &legacyByteStream{provider: prov}
called := false
stream.SetReconnectCallback(func() { called = true })
if prov.reconnectCallback == nil {
t.Fatal("SetReconnectCallback() did not install provider callback")
}
prov.reconnectCallback(nil)
if !called {
t.Fatal("reconnect callback was not adapted")
}
reconnectAllowed := false
stream.SetShouldReconnect(func() bool { reconnectAllowed = true; return true })
if prov.shouldReconnect == nil || !prov.shouldReconnect() || !reconnectAllowed {
t.Fatal("SetShouldReconnect() was not forwarded")
}
ended := ""
stream.SetEndedCallback(func(reason string) { ended = reason })
if prov.endedCallback == nil {
t.Fatal("SetEndedCallback() was not forwarded")
}
prov.endedCallback("bye")
if ended != "bye" {
t.Fatalf("ended callback reason = %q, want bye", ended)
}
stream.WatchConnection(context.Background())
if !prov.watchCalled {
t.Fatal("WatchConnection() was not forwarded")
}
if !stream.CanSend() {
t.Fatal("CanSend() = false, want true")
}
}
func TestLegacyByteStreamWrapsErrors(t *testing.T) {
prov := &stubProvider{
connectErr: errors.New("connect boom"),
sendErr: errors.New("send boom"),
closeErr: errors.New("close boom"),
}
stream := &legacyByteStream{provider: prov}
if err := stream.Connect(context.Background()); err == nil || err.Error() != "connect: connect boom" {
t.Fatalf("Connect() error = %v", err)
}
if err := stream.Send([]byte("x")); err == nil || err.Error() != "send: send boom" {
t.Fatalf("Send() error = %v", err)
}
if err := stream.Close(); err == nil || err.Error() != "close: close boom" {
t.Fatalf("Close() error = %v", err)
}
}
func TestLegacySessionOpenByteStreamAndVideoTrack(t *testing.T) {
prov := &stubProvider{canSend: true}
sess := &legacySession{provider: prov}
stream, err := sess.OpenByteStream()
if err != nil {
t.Fatalf("OpenByteStream() error = %v", err)
}
if !stream.CanSend() {
t.Fatal("byte stream CanSend() = false, want true")
}
video, err := sess.OpenVideoTrack()
if err != nil {
t.Fatalf("OpenVideoTrack() error = %v", err)
}
if err := video.Connect(context.Background()); err != nil {
t.Fatalf("video Connect() error = %v", err)
}
if err := video.Close(); err != nil {
t.Fatalf("video Close() error = %v", err)
}
video.SetShouldReconnect(func() bool { return true })
video.SetEndedCallback(func(string) {})
video.WatchConnection(context.Background())
if !video.CanSend() || prov.shouldReconnect == nil || prov.endedCallback == nil || !prov.watchCalled {
t.Fatal("video adapter did not forward calls")
}
}
func TestLegacyVideoTrackWrapsOperations(t *testing.T) {
prov := &stubProvider{canSend: true, addTrackErr: errors.New("track boom")}
track := &legacyVideoTrack{provider: prov}
called := false
track.SetReconnectCallback(func() { called = true })
prov.reconnectCallback(nil)
if !called {
t.Fatal("reconnect callback was not adapted")
}
track.SetTrackHandler(func(*webrtc.TrackRemote, *webrtc.RTPReceiver) {})
if !prov.trackHandlerCalled {
t.Fatal("SetTrackHandler() was not forwarded")
}
if err := track.AddTrack(nil); err == nil || err.Error() != "add track: track boom" {
t.Fatalf("AddTrack() error = %v", err)
}
}
func TestLegacyVideoTrackWrapsConnectCloseErrors(t *testing.T) {
prov := &stubProvider{
connectErr: errors.New("connect boom"),
closeErr: errors.New("close boom"),
}
track := &legacyVideoTrack{provider: prov}
if err := track.Connect(context.Background()); err == nil || err.Error() != "connect: connect boom" {
t.Fatalf("Connect() error = %v", err)
}
if err := track.Close(); err == nil || err.Error() != "close: close boom" {
t.Fatalf("Close() error = %v", err)
}
}

View File

@@ -9,8 +9,6 @@ import (
)
var (
// ErrProviderNotFound is returned when a requested provider is not registered.
ErrProviderNotFound = errors.New("provider not found")
// ErrDataChannelTimeout is returned when the DataChannel fails to open within the timeout period.
ErrDataChannelTimeout = errors.New("datachannel timeout")
// ErrDataChannelNotReady is returned when attempting to send data before the DataChannel is open.
@@ -50,34 +48,3 @@ type Config struct {
ProxyAddr string
ProxyPort int
}
// Factory is a function that creates a new Provider instance.
type Factory func(ctx context.Context, cfg Config) (Provider, error)
// registry holds all registered provider factories.
//
//nolint:gochecknoglobals // Global registry is required for provider discovery.
var registry = make(map[string]Factory)
// Register adds a new provider factory to the registry.
func Register(name string, factory Factory) {
registry[name] = factory
}
// New creates a new Provider instance by name.
func New(ctx context.Context, name string, cfg Config) (Provider, error) {
factory, ok := registry[name]
if !ok {
return nil, ErrProviderNotFound
}
return factory(ctx, cfg)
}
// Available returns a list of registered provider names.
func Available() []string {
names := make([]string, 0, len(registry))
for name := range registry {
names = append(names, name)
}
return names
}

View File

@@ -1,75 +0,0 @@
package provider
import (
"context"
"errors"
"reflect"
"testing"
"github.com/pion/webrtc/v4"
)
type stubProvider struct{}
func (s *stubProvider) Connect(context.Context) error { return nil }
func (s *stubProvider) Send([]byte) error { return nil }
func (s *stubProvider) Close() error { return nil }
func (s *stubProvider) SetReconnectCallback(func(*webrtc.DataChannel)) {}
func (s *stubProvider) SetShouldReconnect(func() bool) {}
func (s *stubProvider) SetEndedCallback(func(string)) {}
func (s *stubProvider) WatchConnection(context.Context) {}
func (s *stubProvider) CanSend() bool { return true }
func (s *stubProvider) GetSendQueue() chan []byte { return nil }
func (s *stubProvider) GetBufferedAmount() uint64 { return 0 }
func snapshotProviderRegistry() map[string]Factory {
out := make(map[string]Factory, len(registry))
for k, v := range registry {
out[k] = v
}
return out
}
func restoreProviderRegistry(src map[string]Factory) {
registry = make(map[string]Factory, len(src))
for k, v := range src {
registry[k] = v
}
}
func TestNewAndAvailable(t *testing.T) {
old := snapshotProviderRegistry()
t.Cleanup(func() { restoreProviderRegistry(old) })
called := false
Register("test-provider", func(_ context.Context, cfg Config) (Provider, error) {
called = cfg.Name == "peer"
return &stubProvider{}, nil
})
got, err := New(context.Background(), "test-provider", Config{Name: "peer"})
if err != nil {
t.Fatalf("New() error = %v", err)
}
if !called {
t.Fatal("factory did not receive config")
}
if _, ok := got.(*stubProvider); !ok {
t.Fatalf("New() returned %T, want *stubProvider", got)
}
if !reflect.DeepEqual(Available(), []string{"test-provider"}) {
t.Fatalf("Available() = %#v, want %#v", Available(), []string{"test-provider"})
}
}
func TestNewReturnsErrProviderNotFound(t *testing.T) {
old := snapshotProviderRegistry()
t.Cleanup(func() { restoreProviderRegistry(old) })
registry = map[string]Factory{}
_, err := New(context.Background(), "missing", Config{})
if !errors.Is(err, ErrProviderNotFound) {
t.Fatalf("New() error = %v, want %v", err, ErrProviderNotFound)
}
}

View File

@@ -1,4 +1,4 @@
// Package datachannel provides a transport backed by the current WebRTC providers.
// Package datachannel provides a transport backed by the current carriers.
package datachannel
import (
@@ -15,7 +15,7 @@ type streamTransport struct {
stream carrier.ByteStream
}
// New creates a datachannel transport backed by a carrier-specific provider.
// New creates a datachannel transport backed by a carrier.
func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) {
session, err := carrier.New(ctx, cfg.Carrier, carrier.Config{
RoomURL: cfg.RoomURL,
@@ -26,7 +26,7 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error)
ProxyPort: cfg.ProxyPort,
})
if err != nil {
return nil, fmt.Errorf("create provider transport: %w", err)
return nil, fmt.Errorf("create carrier transport: %w", err)
}
streamCapable, ok := session.(carrier.ByteStreamCapable)

View File

@@ -101,7 +101,7 @@ func TestNewErrorPaths(t *testing.T) {
carrier.Register("datachannel-fail-create", func(context.Context, carrier.Config) (carrier.Session, error) {
return nil, errors.New("boom")
})
if _, err := New(context.Background(), transport.Config{Carrier: "datachannel-fail-create"}); err == nil || err.Error() != "create provider transport: boom" {
if _, err := New(context.Background(), transport.Config{Carrier: "datachannel-fail-create"}); err == nil || err.Error() != "create carrier transport: boom" {
t.Fatalf("New() error = %v", err)
}

View File

@@ -97,7 +97,7 @@ type streamTransport struct {
batchSize int
}
// New creates a seichannel transport backed by a carrier-specific provider.
// New creates a seichannel transport backed by a carrier.
func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) {
session, err := carrier.New(ctx, cfg.Carrier, carrier.Config{
RoomURL: cfg.RoomURL,
@@ -108,7 +108,7 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error)
ProxyPort: cfg.ProxyPort,
})
if err != nil {
return nil, fmt.Errorf("create provider transport: %w", err)
return nil, fmt.Errorf("create carrier transport: %w", err)
}
videoCapable, ok := session.(carrier.VideoTrackCapable)

View File

@@ -114,7 +114,7 @@ func TestNewErrorPaths(t *testing.T) {
carrier.Register("seichannel-create-fails", func(context.Context, carrier.Config) (carrier.Session, error) {
return nil, errors.New("boom")
})
if _, err := New(context.Background(), transport.Config{Carrier: "seichannel-create-fails"}); err == nil || err.Error() != "create provider transport: boom" {
if _, err := New(context.Background(), transport.Config{Carrier: "seichannel-create-fails"}); err == nil || err.Error() != "create carrier transport: boom" {
t.Fatalf("New() error = %v", err)
}

View File

@@ -76,7 +76,7 @@ type streamTransport struct {
idleFrameMu sync.Mutex
}
// New creates a visual videochannel transport backed by a carrier-specific provider.
// New creates a visual videochannel transport backed by a carrier.
func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) {
session, err := carrier.New(ctx, cfg.Carrier, carrier.Config{
RoomURL: cfg.RoomURL,
@@ -87,7 +87,7 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error)
ProxyPort: cfg.ProxyPort,
})
if err != nil {
return nil, fmt.Errorf("create provider transport: %w", err)
return nil, fmt.Errorf("create carrier transport: %w", err)
}
videoCapable, ok := session.(carrier.VideoTrackCapable)

View File

@@ -104,7 +104,7 @@ func TestNewErrorPaths(t *testing.T) {
carrier.Register("videochannel-create-fails", func(context.Context, carrier.Config) (carrier.Session, error) {
return nil, errors.New("boom")
})
if _, err := New(context.Background(), transport.Config{Carrier: "videochannel-create-fails"}); err == nil || err.Error() != "create provider transport: boom" {
if _, err := New(context.Background(), transport.Config{Carrier: "videochannel-create-fails"}); err == nil || err.Error() != "create carrier transport: boom" {
t.Fatalf("New() error = %v", err)
}

View File

@@ -93,7 +93,7 @@ type streamTransport struct {
reconnectFn func()
}
// New creates a vp8channel transport backed by a carrier-specific provider.
// New creates a vp8channel transport backed by a carrier.
func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) {
session, err := carrier.New(ctx, cfg.Carrier, carrier.Config{
RoomURL: cfg.RoomURL,
@@ -104,7 +104,7 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error)
ProxyPort: cfg.ProxyPort,
})
if err != nil {
return nil, fmt.Errorf("create provider transport: %w", err)
return nil, fmt.Errorf("create carrier transport: %w", err)
}
videoCapable, ok := session.(carrier.VideoTrackCapable)

View File

@@ -131,7 +131,7 @@ func TestNewErrorPaths(t *testing.T) {
carrier.Register("vp8channel-create-fails", func(context.Context, carrier.Config) (carrier.Session, error) {
return nil, errors.New("boom")
})
if _, err := New(context.Background(), transport.Config{Carrier: "vp8channel-create-fails"}); err == nil || err.Error() != "create provider transport: boom" {
if _, err := New(context.Background(), transport.Config{Carrier: "vp8channel-create-fails"}); err == nil || err.Error() != "create carrier transport: boom" {
t.Fatalf("New() error = %v", err)
}

View File

@@ -142,7 +142,7 @@ func SetDebug(enabled bool) {
}
// Start launches the olcRTC client in background.
// carrierName: carrier/provider name ("telemost", "jazz", "wbstream", "wbstream")
// carrierName: carrier name ("telemost", "jazz", "wbstream")
// roomID: carrier-specific room ID
// clientID: client identifier that must match the server's -client-id
// keyHex: 64-char hex encryption key

View File

@@ -30,14 +30,14 @@ if [ "$#" -gt 0 ]; then
fi
mode="${OLCRTC_MODE:-srv}"
room_id="${OLCRTC_ROOM_ID:-${ROOM_ID:-}}"
carrier="${OLCRTC_CARRIER:-${OLCRTC_PROVIDER:-}}"
room_id="${OLCRTC_ROOM_ID:-}"
carrier="${OLCRTC_CARRIER:-}"
transport="${OLCRTC_TRANSPORT:-}"
link="${OLCRTC_LINK:-direct}"
data_dir="${OLCRTC_DATA_DIR:-/usr/share/olcrtc}"
dns_server="${OLCRTC_DNS:-1.1.1.1:53}"
key="${OLCRTC_KEY:-${KEY:-}}"
client_id="${OLCRTC_CLIENT_ID:-${CLIENT_ID:-}}"
key="${OLCRTC_KEY:-}"
client_id="${OLCRTC_CLIENT_ID:-}"
key_file="${OLCRTC_KEY_FILE:-/var/lib/olcrtc/key.hex}"
socks_proxy="${OLCRTC_SOCKS_PROXY:-}"
socks_proxy_port="${OLCRTC_SOCKS_PROXY_PORT:-1080}"