diff --git a/cmd/olcrtc/main_test.go b/cmd/olcrtc/main_test.go index 465b13b..c2bb41d 100644 --- a/cmd/olcrtc/main_test.go +++ b/cmd/olcrtc/main_test.go @@ -86,7 +86,6 @@ func TestRunWithConfigValidationAndDataDirErrors(t *testing.T) { session.RegisterDefaults() scfg := session.Config{ Mode: "srv", - Link: "direct", Transport: "datachannel", Auth: "jazz", KeyHex: "key", diff --git a/internal/app/session/session.go b/internal/app/session/session.go index 37320fb..665d0cc 100644 --- a/internal/app/session/session.go +++ b/internal/app/session/session.go @@ -16,8 +16,6 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/client" "github.com/openlibrecommunity/olcrtc/internal/control" "github.com/openlibrecommunity/olcrtc/internal/crypto" - "github.com/openlibrecommunity/olcrtc/internal/link" - "github.com/openlibrecommunity/olcrtc/internal/link/direct" "github.com/openlibrecommunity/olcrtc/internal/logger" "github.com/openlibrecommunity/olcrtc/internal/names" "github.com/openlibrecommunity/olcrtc/internal/server" @@ -72,13 +70,9 @@ var ( ErrURLRequired = errors.New("SFU URL required (set auth.url)") // ErrUnsupportedCarrier indicates that carrier is not registered. ErrUnsupportedCarrier = errors.New("unsupported carrier") - // ErrUnsupportedLink indicates that link is not registered. - ErrUnsupportedLink = errors.New("unsupported link") // ErrUnsupportedTransport indicates that transport is not registered. ErrUnsupportedTransport = errors.New("unsupported transport") - // ErrLinkRequired indicates that link is not provided. - ErrLinkRequired = errors.New("link required (set link to direct)") // ErrTransportRequired indicates that transport is not provided. ErrTransportRequired = errors.New( "transport required (set transport to datachannel, videochannel, seichannel or vp8channel)") @@ -154,7 +148,6 @@ var ( // Config holds runtime session settings. type Config struct { Mode string - Link string Transport string Auth string Engine string @@ -199,7 +192,6 @@ type Config struct { // RegisterDefaults registers built-in carriers and transports. func RegisterDefaults() { builtin.Register() - link.Register("direct", direct.New) transport.Register("datachannel", datachannel.New) transport.Register("videochannel", videochannel.New) transport.Register("seichannel", seichannel.New) @@ -326,9 +318,6 @@ func Validate(cfg Config) error { if err := validateAuth(cfg); err != nil { return err } - if err := validateLink(cfg); err != nil { - return err - } if err := validateTransportRegistration(cfg); err != nil { return err } @@ -369,16 +358,6 @@ func validateAuth(cfg Config) error { return nil } -func validateLink(cfg Config) error { - if cfg.Link == "" { - return ErrLinkRequired - } - if !slices.Contains(link.Available(), cfg.Link) { - return fmt.Errorf("%w: %s (available: %v)", ErrUnsupportedLink, cfg.Link, link.Available()) - } - return nil -} - func validateTransportRegistration(cfg Config) error { if cfg.Transport == "" { return ErrTransportRequired @@ -641,7 +620,6 @@ func runOnce( switch cfg.Mode { case modeSRV: if err := server.Run(ctx, server.Config{ - Link: cfg.Link, Transport: cfg.Transport, Carrier: cfg.Auth, RoomURL: roomURL, @@ -671,7 +649,6 @@ func runOnce( return nil case modeCNC: if err := client.Run(ctx, client.Config{ - Link: cfg.Link, Transport: cfg.Transport, Carrier: cfg.Auth, RoomURL: roomURL, diff --git a/internal/app/session/session_test.go b/internal/app/session/session_test.go index c2581f6..02206a3 100644 --- a/internal/app/session/session_test.go +++ b/internal/app/session/session_test.go @@ -141,7 +141,6 @@ func TestValidate(t *testing.T) { base := Config{ Mode: modeSRV, - Link: "direct", Transport: "datachannel", Auth: "telemost", RoomID: "room-1", @@ -192,15 +191,6 @@ func TestValidate(t *testing.T) { }(), want: ErrUnsupportedCarrier, }, - { - name: "unsupported link", - cfg: func() Config { - cfg := base - cfg.Link = "unknown" - return cfg - }(), - want: ErrUnsupportedLink, - }, { name: "unsupported transport", cfg: func() Config { diff --git a/internal/client/client.go b/internal/client/client.go index 08577c8..0d53215 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -20,7 +20,6 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/control" "github.com/openlibrecommunity/olcrtc/internal/crypto" "github.com/openlibrecommunity/olcrtc/internal/handshake" - "github.com/openlibrecommunity/olcrtc/internal/link" "github.com/openlibrecommunity/olcrtc/internal/logger" "github.com/openlibrecommunity/olcrtc/internal/muxconn" "github.com/openlibrecommunity/olcrtc/internal/names" @@ -51,7 +50,7 @@ var ( // Client handles local SOCKS5 connections and tunnels them to the server. type Client struct { - ln link.Link + ln transport.Transport cipher *crypto.Cipher conn *muxconn.Conn session *smux.Session @@ -75,7 +74,6 @@ type HealthFunc func(control.Status) // Config holds runtime configuration for [Run] and [RunWithReady]. type Config struct { - Link string Transport string Carrier string RoomURL string @@ -177,20 +175,19 @@ func (c *Client) bringUpLink( cfg Config, cancel context.CancelFunc, ) error { - ln, err := link.New(ctx, cfg.Link, link.Config{ - Transport: cfg.Transport, - Carrier: cfg.Carrier, - RoomURL: cfg.RoomURL, - Engine: cfg.Engine, - URL: cfg.URL, - Token: cfg.Token, - ChannelID: cfg.ChannelID, - DeviceID: c.deviceID, - Name: names.Generate(), - OnData: c.onData, - DNSServer: cfg.DNSServer, - TransportOptions: cfg.TransportOptions, - Traffic: cfg.Traffic, + ln, err := transport.New(ctx, cfg.Transport, transport.Config{ + Carrier: cfg.Carrier, + RoomURL: cfg.RoomURL, + Engine: cfg.Engine, + URL: cfg.URL, + Token: cfg.Token, + ChannelID: cfg.ChannelID, + DeviceID: c.deviceID, + Name: names.Generate(), + OnData: c.onData, + DNSServer: cfg.DNSServer, + Options: cfg.TransportOptions, + Traffic: cfg.Traffic, }) if err != nil { return fmt.Errorf("failed to create link: %w", err) @@ -325,12 +322,8 @@ func smuxConfig(maxWirePayload ...int) *smux.Config { return cfg } -func linkMaxPayload(ln link.Link) int { - provider, ok := ln.(link.FeaturesProvider) - if !ok { - return 0 - } - return provider.Features().MaxPayloadSize +func linkMaxPayload(tr transport.Transport) int { + return tr.Features().MaxPayloadSize } func (c *Client) handleReconnect(ctx context.Context, cfg Config, cancel context.CancelFunc, reason string) bool { diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 9f624f8..d15229a 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -14,6 +14,7 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/control" cryptopkg "github.com/openlibrecommunity/olcrtc/internal/crypto" "github.com/openlibrecommunity/olcrtc/internal/muxconn" + "github.com/openlibrecommunity/olcrtc/internal/transport" "github.com/xtaci/smux" ) @@ -493,14 +494,15 @@ type closerLinkStub struct { closed bool } -func (s *closerLinkStub) Connect(context.Context) error { return nil } -func (s *closerLinkStub) Send([]byte) error { return nil } -func (s *closerLinkStub) Close() error { s.closed = true; return nil } -func (s *closerLinkStub) SetReconnectCallback(func()) {} -func (s *closerLinkStub) SetShouldReconnect(func() bool) {} -func (s *closerLinkStub) SetEndedCallback(func(string)) {} -func (s *closerLinkStub) WatchConnection(context.Context) {} -func (s *closerLinkStub) CanSend() bool { return true } +func (s *closerLinkStub) Connect(context.Context) error { return nil } +func (s *closerLinkStub) Send([]byte) error { return nil } +func (s *closerLinkStub) Close() error { s.closed = true; return nil } +func (s *closerLinkStub) SetReconnectCallback(func()) {} +func (s *closerLinkStub) SetShouldReconnect(func() bool) {} +func (s *closerLinkStub) SetEndedCallback(func(string)) {} +func (s *closerLinkStub) WatchConnection(context.Context) {} +func (s *closerLinkStub) CanSend() bool { return true } +func (s *closerLinkStub) Features() transport.Features { return transport.Features{} } func TestOnDataWithNilConn(_ *testing.T) { c := &Client{} diff --git a/internal/config/config.go b/internal/config/config.go index d831669..5af1fd3 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -31,7 +31,6 @@ var ( // File is the on-disk YAML schema. type File struct { Mode string `yaml:"mode"` - Link string `yaml:"link"` Auth Auth `yaml:"auth"` Room Room `yaml:"room"` Crypto Crypto `yaml:"crypto"` @@ -55,7 +54,6 @@ type File struct { // Profile is a failover entry that overrides top-level runtime fields. type Profile struct { Name string `yaml:"name"` - Link string `yaml:"link"` Auth Auth `yaml:"auth"` Room Room `yaml:"room"` Crypto Crypto `yaml:"crypto"` @@ -243,7 +241,6 @@ func readKeyFile(configPath, keyFile string) (string, error) { // YAML values fill in the rest. func Apply(dst session.Config, f File) session.Config { dst.Mode = pickString(dst.Mode, f.Mode) - dst.Link = pickString(dst.Link, f.Link) dst.Transport = pickString(dst.Transport, f.Net.Transport) dst.Auth = pickString(dst.Auth, f.Auth.Provider) dst.Engine = pickString(dst.Engine, f.Engine.Name) @@ -289,7 +286,6 @@ func Apply(dst session.Config, f File) session.Config { // ApplyProfile overlays a failover profile onto an already-applied base config. func ApplyProfile(base session.Config, p Profile) session.Config { dst := base - dst.Link = overlayString(dst.Link, p.Link) dst.Transport = overlayString(dst.Transport, p.Net.Transport) dst.Auth = overlayString(dst.Auth, p.Auth.Provider) dst.Engine = overlayString(dst.Engine, p.Engine.Name) diff --git a/internal/config/config_test.go b/internal/config/config_test.go index cd6d871..0dbef4e 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -87,7 +87,6 @@ func requireAppliedConfig(t *testing.T, got session.Config) { t.Helper() want := session.Config{ Mode: testModeSrv, - Link: "direct", Auth: testAuthProvider, RoomID: testRoomID, KeyHex: testCryptoKey, diff --git a/internal/e2e/tunnel_test.go b/internal/e2e/tunnel_test.go index 65f4ce6..1af02f0 100644 --- a/internal/e2e/tunnel_test.go +++ b/internal/e2e/tunnel_test.go @@ -25,7 +25,6 @@ import ( authWBStream "github.com/openlibrecommunity/olcrtc/internal/auth/wbstream" "github.com/openlibrecommunity/olcrtc/internal/carrier" "github.com/openlibrecommunity/olcrtc/internal/client" - "github.com/openlibrecommunity/olcrtc/internal/link" "github.com/openlibrecommunity/olcrtc/internal/server" "github.com/openlibrecommunity/olcrtc/internal/supervisor" "github.com/openlibrecommunity/olcrtc/internal/transport" @@ -41,7 +40,6 @@ const ( transportVideo = "videochannel" transportSEI = "seichannel" transportVP8 = "vp8channel" - linkDirect = "direct" testRoom = "room" localDNSServer = "127.0.0.1:53" videoHWNone = "none" @@ -635,7 +633,6 @@ func requireRealRoom(ctx context.Context, t *testing.T, carrierName string) stri func validSessionConfig(mode, carrierName, transportName string) session.Config { return session.Config{ Mode: mode, - Link: linkDirect, Transport: transportName, Auth: carrierName, RoomID: testRoom, @@ -687,39 +684,15 @@ func e2eTransportOptions(transportName string) transport.Options { return nil } -func validLinkConfig(carrierName, transportName string) link.Config { +func validTransportConfig(carrierName, transportName string) transport.Config { cfg := validSessionConfig("cnc", carrierName, transportName) - var opts transport.Options - switch transportName { - case "videochannel": - opts = videochannel.Options{ - Width: cfg.VideoWidth, - Height: cfg.VideoHeight, - FPS: cfg.VideoFPS, - Bitrate: cfg.VideoBitrate, - HW: cfg.VideoHW, - Codec: cfg.VideoCodec, - TileModule: cfg.VideoTileModule, - TileRS: cfg.VideoTileRS, - } - case "vp8channel": - opts = vp8channel.Options{FPS: cfg.VP8FPS, BatchSize: cfg.VP8BatchSize} - case "seichannel": - opts = seichannel.Options{ - FPS: cfg.SEIFPS, - BatchSize: cfg.SEIBatchSize, - FragmentSize: cfg.SEIFragmentSize, - AckTimeoutMS: cfg.SEIAckTimeoutMS, - } - } - return link.Config{ - Transport: cfg.Transport, - Carrier: cfg.Auth, - RoomURL: testRoom, - DeviceID: "e2e-link-test", - Name: "e2e-" + carrierName + "-" + transportName, - DNSServer: cfg.DNSServer, - TransportOptions: opts, + return transport.Config{ + Carrier: cfg.Auth, + RoomURL: testRoom, + DeviceID: "e2e-link-test", + Name: "e2e-" + carrierName + "-" + transportName, + DNSServer: cfg.DNSServer, + Options: e2eTransportOptions(transportName), } } @@ -792,7 +765,6 @@ func startTunnel(t *testing.T) *tunnelRuntime { serverErr := make(chan error, 1) go func() { serverErr <- server.Run(ctx, server.Config{ - Link: linkDirect, Transport: transportData, Carrier: carrierName, RoomURL: testRoom, @@ -806,7 +778,6 @@ func startTunnel(t *testing.T) *tunnelRuntime { clientErr := make(chan error, 1) go func() { clientErr <- client.RunWithReady(ctx, client.Config{ - Link: linkDirect, Transport: transportData, Carrier: carrierName, RoomURL: testRoom, @@ -845,7 +816,6 @@ func startRealTunnel( serverErr := make(chan error, 1) go func() { serverErr <- server.Run(runCtx, server.Config{ - Link: linkDirect, Transport: transportName, Carrier: carrierName, RoomURL: roomURL, @@ -870,7 +840,6 @@ func startRealTunnel( clientErr := make(chan error, 1) go func() { clientErr <- client.RunWithReady(runCtx, client.Config{ - Link: linkDirect, Transport: transportName, Carrier: carrierName, RoomURL: roomURL, @@ -1029,7 +998,7 @@ func TestBuiltInProviderTransportMatrixValidates(t *testing.T) { } } -func TestDirectLinkCreatesAllProviderTransportCombinations(t *testing.T) { +func TestTransportCreatesAllProviderTransportCombinations(t *testing.T) { session.RegisterDefaults() for _, carrierName := range builtInCarrierNames() { @@ -1040,11 +1009,11 @@ func TestDirectLinkCreatesAllProviderTransportCombinations(t *testing.T) { t.Run(carrierName, func(t *testing.T) { for _, transportName := range builtInTransportNames() { t.Run(transportName, func(t *testing.T) { - ln, err := link.New(context.Background(), linkDirect, validLinkConfig(carrierName, transportName)) + tr, err := transport.New(context.Background(), transportName, validTransportConfig(carrierName, transportName)) if err != nil { - t.Fatalf("link.New() error = %v", err) + t.Fatalf("transport.New() error = %v", err) } - if err := ln.Close(); err != nil { + if err := tr.Close(); err != nil { t.Fatalf("Close() error = %v", err) } }) @@ -1053,7 +1022,7 @@ func TestDirectLinkCreatesAllProviderTransportCombinations(t *testing.T) { } } -func TestDirectLinkConnectsFastProviderTransportMatrix(t *testing.T) { +func TestTransportConnectsFastProviderTransportMatrix(t *testing.T) { session.RegisterDefaults() for _, carrierName := range builtInCarrierNames() { @@ -1064,15 +1033,15 @@ func TestDirectLinkConnectsFastProviderTransportMatrix(t *testing.T) { t.Run(carrierName, func(t *testing.T) { for _, transportName := range []string{transportData, transportSEI} { t.Run(transportName, func(t *testing.T) { - ln, err := link.New(context.Background(), linkDirect, validLinkConfig(carrierName, transportName)) + tr, err := transport.New(context.Background(), transportName, validTransportConfig(carrierName, transportName)) if err != nil { - t.Fatalf("link.New() error = %v", err) + t.Fatalf("transport.New() error = %v", err) } - if err := ln.Connect(context.Background()); err != nil { + if err := tr.Connect(context.Background()); err != nil { t.Fatalf("Connect() error = %v", err) } - assertLinkCanSendAfterConnect(t, ln, transportName) - if err := ln.Close(); err != nil { + assertTransportCanSendAfterConnect(t, tr, transportName) + if err := tr.Close(); err != nil { t.Fatalf("Close() error = %v", err) } }) @@ -1081,16 +1050,16 @@ func TestDirectLinkConnectsFastProviderTransportMatrix(t *testing.T) { } } -func assertLinkCanSendAfterConnect(t *testing.T, ln link.Link, transportName string) { +func assertTransportCanSendAfterConnect(t *testing.T, tr transport.Transport, transportName string) { t.Helper() if transportName == transportSEI { - if ln.CanSend() { + if tr.CanSend() { t.Fatal("CanSend() = true before peer seichannel frame") } return } - if !ln.CanSend() { + if !tr.CanSend() { t.Fatal("CanSend() = false, want true") } } @@ -1312,7 +1281,6 @@ func TestSupervisorFailoverProfilesReachWorkingSOCKS(t *testing.T) { func failoverSessionConfig(mode, carrierName, socksHost string, socksPort int) session.Config { cfg := session.Config{ Mode: mode, - Link: linkDirect, Transport: transportData, Auth: carrierName, RoomID: testRoom, @@ -1328,7 +1296,6 @@ func failoverSessionConfig(mode, carrierName, socksHost string, socksPort int) s func clientConfigFromSession(cfg session.Config, socksAddr string) client.Config { return client.Config{ - Link: cfg.Link, Transport: cfg.Transport, Carrier: cfg.Auth, RoomURL: cfg.RoomID, diff --git a/internal/link/direct/direct.go b/internal/link/direct/direct.go deleted file mode 100644 index dc4b7cc..0000000 --- a/internal/link/direct/direct.go +++ /dev/null @@ -1,71 +0,0 @@ -// Package direct provides a pass-through link implementation above transports. -package direct - -import ( - "context" - "fmt" - - "github.com/openlibrecommunity/olcrtc/internal/link" - "github.com/openlibrecommunity/olcrtc/internal/transport" -) - -type directLink struct { - transport transport.Transport -} - -// New creates a direct link that forwards bytes to the selected transport. -func New(ctx context.Context, cfg link.Config) (link.Link, error) { - tr, err := transport.New(ctx, cfg.Transport, transport.Config{ - Carrier: cfg.Carrier, - RoomURL: cfg.RoomURL, - Engine: cfg.Engine, - URL: cfg.URL, - Token: cfg.Token, - ChannelID: cfg.ChannelID, - DeviceID: cfg.DeviceID, - Name: cfg.Name, - OnData: cfg.OnData, - DNSServer: cfg.DNSServer, - ProxyAddr: cfg.ProxyAddr, - ProxyPort: cfg.ProxyPort, - Options: cfg.TransportOptions, - Traffic: cfg.Traffic, - }) - if err != nil { - return nil, fmt.Errorf("create transport for direct link: %w", err) - } - - return &directLink{transport: tr}, nil -} - -func (d *directLink) Connect(ctx context.Context) error { - if err := d.transport.Connect(ctx); err != nil { - return fmt.Errorf("transport connect: %w", err) - } - return nil -} - -func (d *directLink) Send(data []byte) error { - if err := d.transport.Send(data); err != nil { - return fmt.Errorf("transport send: %w", err) - } - return nil -} - -func (d *directLink) Close() error { - if err := d.transport.Close(); err != nil { - return fmt.Errorf("transport close: %w", err) - } - return nil -} - -func (d *directLink) SetReconnectCallback(cb func()) { d.transport.SetReconnectCallback(cb) } -func (d *directLink) SetShouldReconnect(fn func() bool) { d.transport.SetShouldReconnect(fn) } -func (d *directLink) SetEndedCallback(cb func(string)) { d.transport.SetEndedCallback(cb) } -func (d *directLink) WatchConnection(ctx context.Context) { - d.transport.WatchConnection(ctx) -} -func (d *directLink) CanSend() bool { return d.transport.CanSend() } - -// Features reports the direct link's underlying transport capabilities. -func (d *directLink) Features() link.Features { return d.transport.Features() } diff --git a/internal/link/direct/direct_test.go b/internal/link/direct/direct_test.go deleted file mode 100644 index 5eec46b..0000000 --- a/internal/link/direct/direct_test.go +++ /dev/null @@ -1,163 +0,0 @@ -package direct - -import ( - "context" - "errors" - "testing" - - "github.com/openlibrecommunity/olcrtc/internal/link" - "github.com/openlibrecommunity/olcrtc/internal/transport" - "github.com/openlibrecommunity/olcrtc/internal/transport/videochannel" -) - -var ( - errDirectBoom = errors.New("boom") - errDirectConnectBoom = errors.New("connect boom") - errDirectSendBoom = errors.New("send boom") - errDirectCloseBoom = errors.New("close boom") -) - -type stubTransport struct { - connectErr error - sendErr error - closeErr error - canSend bool - - connectCalled bool - sendData []byte - watched bool - reconnectCB func() - shouldFn func() bool - endedCB func(string) -} - -func (s *stubTransport) Connect(context.Context) error { - s.connectCalled = true - return s.connectErr -} -func (s *stubTransport) Send(data []byte) error { - s.sendData = append([]byte(nil), data...) - return s.sendErr -} -func (s *stubTransport) Close() error { return s.closeErr } -func (s *stubTransport) SetReconnectCallback(cb func()) { - s.reconnectCB = cb -} -func (s *stubTransport) SetShouldReconnect(fn func() bool) { s.shouldFn = fn } -func (s *stubTransport) SetEndedCallback(cb func(string)) { s.endedCB = cb } -func (s *stubTransport) WatchConnection(context.Context) { s.watched = true } -func (s *stubTransport) CanSend() bool { return s.canSend } -func (s *stubTransport) Features() transport.Features { return transport.Features{} } - -//nolint:cyclop // table-driven test naturally has many branches -func TestNewForwardsConfigAndMethods(t *testing.T) { - name := "direct-test-forward" - var seen transport.Config - tr := &stubTransport{canSend: true} - transport.Register(name, func(_ context.Context, cfg transport.Config) (transport.Transport, error) { - seen = cfg - return tr, nil - }) - - wantOpts := videochannel.Options{ - Width: 640, - Height: 480, - FPS: 30, - Bitrate: "1M", - HW: "none", - QRSize: 4, - QRRecovery: "low", - Codec: "qrcode", - TileModule: 3, - TileRS: 20, - } - - ln, err := New(context.Background(), link.Config{ - Transport: name, - Carrier: "carrier", - RoomURL: "room", - DeviceID: "client", - Name: "peer", - DNSServer: "1.1.1.1:53", - ProxyAddr: "127.0.0.1", - ProxyPort: 1080, - TransportOptions: wantOpts, - Traffic: transport.TrafficConfig{MaxPayloadSize: 4096}, - }) - if err != nil { - t.Fatalf("New() error = %v", err) - } - - gotOpts, ok := seen.Options.(videochannel.Options) - if !ok { - t.Fatalf("forwarded Options type = %T, want videochannel.Options", seen.Options) - } - if gotOpts != wantOpts { - t.Fatalf("forwarded Options = %+v, want %+v", gotOpts, wantOpts) - } - if seen.DeviceID != "client" || seen.ProxyPort != 1080 || seen.Traffic.MaxPayloadSize != 4096 { - t.Fatalf("forwarded config = %+v", seen) - } - - if err := ln.Connect(context.Background()); err != nil { - t.Fatalf("Connect() error = %v", err) - } - if !tr.connectCalled { - t.Fatal("Connect() was not forwarded") - } - - if err := ln.Send([]byte("payload")); err != nil { - t.Fatalf("Send() error = %v", err) - } - if string(tr.sendData) != "payload" { - t.Fatalf("Send() forwarded %q, want payload", tr.sendData) - } - - ln.SetReconnectCallback(func() {}) - ln.SetShouldReconnect(func() bool { return true }) - ln.SetEndedCallback(func(string) {}) - ln.WatchConnection(context.Background()) - if tr.reconnectCB == nil || tr.shouldFn == nil || tr.endedCB == nil || !tr.watched { - t.Fatal("callbacks/watch were not forwarded") - } - if !ln.CanSend() { - t.Fatal("CanSend() = false, want true") - } - provider, ok := ln.(link.FeaturesProvider) - if !ok { - t.Fatalf("New() type = %T, want link.FeaturesProvider", ln) - } - if features := provider.Features(); features.MaxPayloadSize != 4096 { - t.Fatalf("Features() = %+v, want shaped max payload 4096", features) - } -} - -func TestNewWrapsFactoryError(t *testing.T) { - name := "direct-test-error" - transport.Register(name, func(context.Context, transport.Config) (transport.Transport, error) { - return nil, errDirectBoom - }) - - _, err := New(context.Background(), link.Config{Transport: name}) - if err == nil || err.Error() != "create transport for direct link: boom" { - t.Fatalf("New() error = %v", err) - } -} - -func TestDirectLinkWrapsTransportErrors(t *testing.T) { - ln := &directLink{transport: &stubTransport{ - connectErr: errDirectConnectBoom, - sendErr: errDirectSendBoom, - closeErr: errDirectCloseBoom, - }} - - if err := ln.Connect(context.Background()); err == nil || err.Error() != "transport connect: connect boom" { - t.Fatalf("Connect() error = %v", err) - } - if err := ln.Send([]byte("x")); err == nil || err.Error() != "transport send: send boom" { - t.Fatalf("Send() error = %v", err) - } - if err := ln.Close(); err == nil || err.Error() != "transport close: close boom" { - t.Fatalf("Close() error = %v", err) - } -} diff --git a/internal/link/link.go b/internal/link/link.go deleted file mode 100644 index 84f640f..0000000 --- a/internal/link/link.go +++ /dev/null @@ -1,85 +0,0 @@ -// Package link defines link-layer abstractions above transports. -package link - -import ( - "context" - "errors" - - "github.com/openlibrecommunity/olcrtc/internal/transport" -) - -var ( - // ErrLinkNotFound is returned when a requested link is not registered. - ErrLinkNotFound = errors.New("link not found") -) - -// Link defines a byte link above a transport. -type Link interface { - Connect(ctx context.Context) error - Send(data []byte) error - Close() error - SetReconnectCallback(cb func()) - SetShouldReconnect(fn func() bool) - SetEndedCallback(cb func(string)) - WatchConnection(ctx context.Context) - CanSend() bool -} - -// Features mirrors the underlying transport capabilities when a link can expose them. -type Features = transport.Features - -// FeaturesProvider is optionally implemented by links that can report wire limits. -type FeaturesProvider interface { - Features() Features -} - -// Config holds common link configuration. -type Config struct { - Transport string - Carrier string - RoomURL string - // Engine, URL, Token are forwarded for the "none" auth carrier. - Engine string - URL string - Token string - ChannelID string - DeviceID string - Name string - OnData func([]byte) - DNSServer string - ProxyAddr string - ProxyPort int - - // TransportOptions is forwarded verbatim to transport.Config.Options. - TransportOptions transport.Options - - Traffic transport.TrafficConfig -} - -// Factory creates a link instance. -type Factory func(ctx context.Context, cfg Config) (Link, error) - -var registry = make(map[string]Factory) //nolint:gochecknoglobals // package-level state intentional - -// Register adds a link factory to the registry. -func Register(name string, factory Factory) { - registry[name] = factory -} - -// New creates a link instance by name. -func New(ctx context.Context, name string, cfg Config) (Link, error) { - factory, ok := registry[name] - if !ok { - return nil, ErrLinkNotFound - } - return factory(ctx, cfg) -} - -// Available returns a list of registered link names. -func Available() []string { - names := make([]string, 0, len(registry)) - for name := range registry { - names = append(names, name) - } - return names -} diff --git a/internal/link/link_test.go b/internal/link/link_test.go deleted file mode 100644 index 15260cc..0000000 --- a/internal/link/link_test.go +++ /dev/null @@ -1,71 +0,0 @@ -package link - -import ( - "context" - "errors" - "reflect" - "testing" -) - -type stubLink struct{} - -func (s *stubLink) Connect(context.Context) error { return nil } -func (s *stubLink) Send([]byte) error { return nil } -func (s *stubLink) Close() error { return nil } -func (s *stubLink) SetReconnectCallback(func()) {} -func (s *stubLink) SetShouldReconnect(func() bool) {} -func (s *stubLink) SetEndedCallback(func(string)) {} -func (s *stubLink) WatchConnection(context.Context) {} -func (s *stubLink) CanSend() bool { return true } - -func snapshotLinkRegistry() map[string]Factory { - out := make(map[string]Factory, len(registry)) - for k, v := range registry { - out[k] = v - } - return out -} - -func restoreLinkRegistry(src map[string]Factory) { - registry = make(map[string]Factory, len(src)) - for k, v := range src { - registry[k] = v - } -} - -func TestNewAndAvailable(t *testing.T) { - old := snapshotLinkRegistry() - t.Cleanup(func() { restoreLinkRegistry(old) }) - - called := false - Register("test-link", func(_ context.Context, cfg Config) (Link, error) { - called = cfg.DeviceID == "client-1" - return &stubLink{}, nil - }) - - got, err := New(context.Background(), "test-link", Config{DeviceID: "client-1"}) - if err != nil { - t.Fatalf("New() error = %v", err) - } - if !called { - t.Fatal("factory did not receive config") - } - if _, ok := got.(*stubLink); !ok { - t.Fatalf("New() returned %T, want *stubLink", got) - } - - if !reflect.DeepEqual(Available(), []string{"test-link"}) { - t.Fatalf("Available() = %#v, want %#v", Available(), []string{"test-link"}) - } -} - -func TestNewReturnsErrLinkNotFound(t *testing.T) { - old := snapshotLinkRegistry() - t.Cleanup(func() { restoreLinkRegistry(old) }) - registry = map[string]Factory{} - - _, err := New(context.Background(), "missing", Config{}) - if !errors.Is(err, ErrLinkNotFound) { - t.Fatalf("New() error = %v, want %v", err, ErrLinkNotFound) - } -} diff --git a/internal/muxconn/conn.go b/internal/muxconn/conn.go index 1bf8a22..5b4c288 100644 --- a/internal/muxconn/conn.go +++ b/internal/muxconn/conn.go @@ -24,16 +24,16 @@ import ( "time" "github.com/openlibrecommunity/olcrtc/internal/crypto" - "github.com/openlibrecommunity/olcrtc/internal/link" "github.com/openlibrecommunity/olcrtc/internal/logger" + "github.com/openlibrecommunity/olcrtc/internal/transport" ) // ErrClosed is returned from Read/Write after the conn has been closed. var ErrClosed = errors.New("muxconn: closed") -// Conn is an io.ReadWriteCloser over a link.Link with optional AEAD wrapping. +// Conn is an io.ReadWriteCloser over a [transport.Transport] with optional AEAD wrapping. type Conn struct { - ln link.Link + ln transport.Transport cipher *crypto.Cipher mu sync.Mutex @@ -42,9 +42,9 @@ type Conn struct { closed bool } -// New wires a Conn over the given link. Push must be set as the link's OnData -// callback before this conn is used. -func New(ln link.Link, cipher *crypto.Cipher) *Conn { +// New wires a Conn over the given transport. Push must be set as the +// transport's OnData callback before this conn is used. +func New(ln transport.Transport, cipher *crypto.Cipher) *Conn { c := &Conn{ln: ln, cipher: cipher} c.cond = sync.NewCond(&c.mu) return c diff --git a/internal/muxconn/conn_test.go b/internal/muxconn/conn_test.go index 8df5424..652ce90 100644 --- a/internal/muxconn/conn_test.go +++ b/internal/muxconn/conn_test.go @@ -10,6 +10,7 @@ import ( "time" cryptopkg "github.com/openlibrecommunity/olcrtc/internal/crypto" + "github.com/openlibrecommunity/olcrtc/internal/transport" ) var errMuxBoom = errors.New("boom") @@ -22,12 +23,13 @@ type stubLink struct { canSendFn func() bool } -func (s *stubLink) Connect(context.Context) error { return nil } -func (s *stubLink) Close() error { return nil } -func (s *stubLink) SetReconnectCallback(func()) {} -func (s *stubLink) SetShouldReconnect(func() bool) {} -func (s *stubLink) SetEndedCallback(func(string)) {} -func (s *stubLink) WatchConnection(context.Context) {} +func (s *stubLink) Connect(context.Context) error { return nil } +func (s *stubLink) Close() error { return nil } +func (s *stubLink) SetReconnectCallback(func()) {} +func (s *stubLink) SetShouldReconnect(func() bool) {} +func (s *stubLink) SetEndedCallback(func(string)) {} +func (s *stubLink) WatchConnection(context.Context) {} +func (s *stubLink) Features() transport.Features { return transport.Features{} } func (s *stubLink) Send(data []byte) error { s.mu.Lock() defer s.mu.Unlock() diff --git a/internal/server/server.go b/internal/server/server.go index 9b22caa..882a8e8 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -17,7 +17,6 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/control" "github.com/openlibrecommunity/olcrtc/internal/crypto" "github.com/openlibrecommunity/olcrtc/internal/handshake" - "github.com/openlibrecommunity/olcrtc/internal/link" "github.com/openlibrecommunity/olcrtc/internal/logger" "github.com/openlibrecommunity/olcrtc/internal/muxconn" "github.com/openlibrecommunity/olcrtc/internal/names" @@ -56,7 +55,7 @@ type HealthFunc func(control.Status) // Server handles incoming tunnel connections and proxies their traffic. type Server struct { - ln link.Link + ln transport.Transport cipher *crypto.Cipher conn *muxconn.Conn session *smux.Session @@ -89,7 +88,6 @@ type ConnectRequest struct { // Config holds runtime configuration for [Run]. type Config struct { - Link string Transport string Carrier string RoomURL string @@ -240,12 +238,8 @@ func smuxConfig(maxWirePayload ...int) *smux.Config { return cfg } -func linkMaxPayload(ln link.Link) int { - provider, ok := ln.(link.FeaturesProvider) - if !ok { - return 0 - } - return provider.Features().MaxPayloadSize +func linkMaxPayload(tr transport.Transport) int { + return tr.Features().MaxPayloadSize } func (s *Server) bringUpLink( @@ -253,25 +247,24 @@ func (s *Server) bringUpLink( cfg Config, cancel context.CancelFunc, ) error { - ln, err := link.New(ctx, cfg.Link, link.Config{ - Transport: cfg.Transport, - Carrier: cfg.Carrier, - RoomURL: cfg.RoomURL, - Engine: cfg.Engine, - URL: cfg.URL, - Token: cfg.Token, - ChannelID: cfg.ChannelID, - DeviceID: "", - Name: names.Generate(), - OnData: s.onData, - DNSServer: s.dnsServer, - ProxyAddr: s.socksProxyAddr, - ProxyPort: s.socksProxyPort, - TransportOptions: cfg.TransportOptions, - Traffic: cfg.Traffic, + ln, err := transport.New(ctx, cfg.Transport, transport.Config{ + Carrier: cfg.Carrier, + RoomURL: cfg.RoomURL, + Engine: cfg.Engine, + URL: cfg.URL, + Token: cfg.Token, + ChannelID: cfg.ChannelID, + DeviceID: "", + Name: names.Generate(), + OnData: s.onData, + DNSServer: s.dnsServer, + ProxyAddr: s.socksProxyAddr, + ProxyPort: s.socksProxyPort, + Options: cfg.TransportOptions, + Traffic: cfg.Traffic, }) if err != nil { - return fmt.Errorf("failed to create link: %w", err) + return fmt.Errorf("failed to create transport: %w", err) } s.ln = ln @@ -287,7 +280,7 @@ func (s *Server) bringUpLink( s.handleReconnect() }) - logger.Infof("Connecting link via %s/%s/%s...", cfg.Link, cfg.Transport, cfg.Carrier) + logger.Infof("Connecting transport=%s carrier=%s ...", cfg.Transport, cfg.Carrier) if err := ln.Connect(ctx); err != nil { return fmt.Errorf("failed to connect link: %w", err) } diff --git a/internal/server/server_test.go b/internal/server/server_test.go index 05bbbf5..67ce828 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -14,6 +14,7 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/control" cryptopkg "github.com/openlibrecommunity/olcrtc/internal/crypto" "github.com/openlibrecommunity/olcrtc/internal/muxconn" + "github.com/openlibrecommunity/olcrtc/internal/transport" "github.com/xtaci/smux" ) @@ -212,14 +213,15 @@ type serverLinkStub struct { closed bool } -func (s *serverLinkStub) Connect(context.Context) error { return nil } -func (s *serverLinkStub) Send([]byte) error { return nil } -func (s *serverLinkStub) Close() error { s.closed = true; return nil } -func (s *serverLinkStub) SetReconnectCallback(func()) {} -func (s *serverLinkStub) SetShouldReconnect(func() bool) {} -func (s *serverLinkStub) SetEndedCallback(func(string)) {} -func (s *serverLinkStub) WatchConnection(context.Context) {} -func (s *serverLinkStub) CanSend() bool { return true } +func (s *serverLinkStub) Connect(context.Context) error { return nil } +func (s *serverLinkStub) Send([]byte) error { return nil } +func (s *serverLinkStub) Close() error { s.closed = true; return nil } +func (s *serverLinkStub) SetReconnectCallback(func()) {} +func (s *serverLinkStub) SetShouldReconnect(func() bool) {} +func (s *serverLinkStub) SetEndedCallback(func(string)) {} +func (s *serverLinkStub) WatchConnection(context.Context) {} +func (s *serverLinkStub) CanSend() bool { return true } +func (s *serverLinkStub) Features() transport.Features { return transport.Features{} } func TestShutdownClosesLinkAndConn(t *testing.T) { cipher, err := cryptopkg.NewCipher("01234567890123456789012345678901") diff --git a/mobile/mobile.go b/mobile/mobile.go index b3d42af..0eb62f9 100644 --- a/mobile/mobile.go +++ b/mobile/mobile.go @@ -50,7 +50,6 @@ var ( ) const ( - defaultLink = "direct" defaultTransport = "vp8channel" dataTransport = "datachannel" defaultDNSServer = "1.1.1.1:53" @@ -80,7 +79,6 @@ var ( ) type mobileConfig struct { - link string transport string dnsServer string vp8FPS int @@ -123,15 +121,6 @@ func SetTransport(transport string) { defaults.transport = normalizeTransport(transport) } -// SetLink selects the link used by Start. -// Supported value today: direct. -func SetLink(link string) { - mu.Lock() - defer mu.Unlock() - ensureDefaultConfigLocked() - defaults.link = link -} - // SetDNS selects the DNS server used by the tunnel. func SetDNS(dnsServer string) { mu.Lock() @@ -243,7 +232,6 @@ func Check( doneCh <- runClientWithReady( ctx, client.Config{ - Link: defaultLink, Transport: transportName, Carrier: carrierName, RoomURL: buildRoomURL(carrierName, roomID), @@ -334,7 +322,6 @@ func Ping( doneCh <- runClientWithReady( ctx, client.Config{ - Link: defaultLink, Transport: transportName, Carrier: carrierName, RoomURL: buildRoomURL(carrierName, roomID), @@ -582,7 +569,6 @@ func startWithConfig( err := runClientWithReady( ctx, client.Config{ - Link: cfg.link, Transport: cfg.transport, Carrier: carrierName, RoomURL: roomURL, @@ -707,7 +693,6 @@ func waitForCheckDone(doneCh <-chan error) { func ensureDefaultConfigLocked() { defaultsSet.Do(func() { defaults = mobileConfig{ - link: defaultLink, transport: defaultTransport, dnsServer: defaultDNSServer, vp8FPS: 60, diff --git a/mobile/mobile_test.go b/mobile/mobile_test.go index 3333ddb..0c81b84 100644 --- a/mobile/mobile_test.go +++ b/mobile/mobile_test.go @@ -83,7 +83,6 @@ func TestDefaultsAndSetters(t *testing.T) { resetMobileGlobals(t) SetTransport("dc") - SetLink("direct") SetDNS("9.9.9.9:53") SetVP8Options(-1, 999) SetLivenessOptions(2500, 750, -1) @@ -91,7 +90,7 @@ func TestDefaultsAndSetters(t *testing.T) { mu.Lock() got := defaults mu.Unlock() - if got.transport != dataTransport || got.link != defaultLink || got.dnsServer != "9.9.9.9:53" || + if got.transport != dataTransport || got.dnsServer != "9.9.9.9:53" || got.vp8FPS != 1 || got.vp8BatchSize != 64 || got.livenessInterval != 2500*time.Millisecond || got.livenessTimeout != 750*time.Millisecond || got.livenessFailures != control.DefaultFailures { @@ -178,16 +177,16 @@ func TestStartWithInjectedRunnerLifecycle(t *testing.T) { runClientWithReady = func(ctx context.Context, cfg client.Config, onReady func()) error { opts, _ := cfg.TransportOptions.(vp8channel.Options) - if cfg.Link != defaultLink || cfg.Transport != dataTransport || cfg.Carrier != carrierJazz || + if cfg.Transport != dataTransport || cfg.Carrier != carrierJazz || cfg.RoomURL != "any" || cfg.DeviceID != "client" || cfg.LocalAddr != "127.0.0.1:1080" || cfg.DNSServer != defaultDNSServer || opts.FPS != 60 || opts.BatchSize != 8 || cfg.Liveness.Interval != 2500*time.Millisecond || cfg.Liveness.Timeout != 750*time.Millisecond || cfg.Liveness.Failures != 4 { t.Fatalf( - "RunWithReady args mismatch: link=%q transport=%q carrier=%q room=%q client=%q "+ + "RunWithReady args mismatch: transport=%q carrier=%q room=%q client=%q "+ "local=%q dns=%q vp8=%d/%d liveness=%+v", - cfg.Link, cfg.Transport, cfg.Carrier, cfg.RoomURL, cfg.DeviceID, + cfg.Transport, cfg.Carrier, cfg.RoomURL, cfg.DeviceID, cfg.LocalAddr, cfg.DNSServer, opts.FPS, opts.BatchSize, cfg.Liveness, ) } diff --git a/pkg/olcrtc/tunnel/tunnel.go b/pkg/olcrtc/tunnel/tunnel.go index 3d4c0e7..3db8d3a 100644 --- a/pkg/olcrtc/tunnel/tunnel.go +++ b/pkg/olcrtc/tunnel/tunnel.go @@ -5,7 +5,6 @@ // authorization and observability via the [Config] hooks: // // srv := tunnel.New(tunnel.Config{ -// Link: "direct", // Transport: "datachannel", // Carrier: "jitsi", // RoomURL: "https://meet.cryptopro.ru/myroom", @@ -72,7 +71,6 @@ type TrafficFunc = server.TrafficFunc // Config holds runtime server configuration. type Config struct { // --- carrier selection --- - Link string // currently only "direct" Transport string // datachannel, videochannel, seichannel, vp8channel Carrier string // jitsi, telemost, jazz, wbstream, none RoomURL string // conference room identifier for the carrier @@ -120,7 +118,6 @@ func New(cfg Config) *Server { // Run starts the server and blocks until ctx is cancelled or the carrier ends. func (s *Server) Run(ctx context.Context) error { if err := server.Run(ctx, server.Config{ - Link: s.cfg.Link, Transport: s.cfg.Transport, Carrier: s.cfg.Carrier, RoomURL: s.cfg.RoomURL, diff --git a/pkg/olcrtc/tunnel/tunnel_test.go b/pkg/olcrtc/tunnel/tunnel_test.go index 17beeb6..d0e785c 100644 --- a/pkg/olcrtc/tunnel/tunnel_test.go +++ b/pkg/olcrtc/tunnel/tunnel_test.go @@ -13,7 +13,6 @@ var errNo = errors.New("no") func TestRun_FailsWithoutKey(t *testing.T) { tunnel.RegisterDefaults() err := tunnel.New(tunnel.Config{ - Link: "direct", Transport: "datachannel", Carrier: "telemost", RoomURL: "room-1",