From f9ad12c733d02af7c3bebc3eedf9dd57269bc219 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 20 Apr 2026 20:09:00 +0300 Subject: [PATCH 01/10] refactor: extract session runtime wiring --- cmd/olcrtc/main.go | 119 +++++---------------------- internal/app/session/session.go | 138 ++++++++++++++++++++++++++++++++ 2 files changed, 156 insertions(+), 101 deletions(-) create mode 100644 internal/app/session/session.go diff --git a/cmd/olcrtc/main.go b/cmd/olcrtc/main.go index 3785c56..402a8a7 100644 --- a/cmd/olcrtc/main.go +++ b/cmd/olcrtc/main.go @@ -3,7 +3,6 @@ package main import ( "context" - "errors" "flag" "fmt" "os" @@ -12,16 +11,9 @@ import ( "syscall" "time" - "github.com/openlibrecommunity/olcrtc/internal/client" + "github.com/openlibrecommunity/olcrtc/internal/app/session" "github.com/openlibrecommunity/olcrtc/internal/logger" "github.com/openlibrecommunity/olcrtc/internal/names" - "github.com/openlibrecommunity/olcrtc/internal/provider" - "github.com/openlibrecommunity/olcrtc/internal/provider/jazz" - "github.com/openlibrecommunity/olcrtc/internal/provider/telemost" - "github.com/openlibrecommunity/olcrtc/internal/provider/wbstream" - "github.com/openlibrecommunity/olcrtc/internal/server" - "github.com/openlibrecommunity/olcrtc/internal/transport" - "github.com/openlibrecommunity/olcrtc/internal/transport/datachannel" ) type config struct { @@ -39,14 +31,6 @@ type config struct { socksProxyPort int } -var ( - errRoomIDRequired = errors.New("room ID required") - errModeRequired = errors.New("specify -mode srv or -mode cnc") - errProviderRequired = errors.New("provider required (use -provider telemost or -provider jazz)") - errUnsupportedProvider = errors.New("unsupported provider") - errUnsupportedTransport = errors.New("unsupported transport") -) - func main() { if err := run(); err != nil { logger.Error(err) @@ -55,15 +39,12 @@ func main() { } func run() error { - provider.Register("jazz", jazz.New) - provider.Register("telemost", telemost.New) - provider.Register("wb_stream", wbstream.New) - transport.Register("datachannel", datachannel.New) + session.RegisterDefaults() cfg := parseFlags() configureLogging(cfg.debug) - if err := validateConfig(cfg); err != nil { + if err := session.Validate(toSessionConfig(cfg)); err != nil { return err } @@ -83,7 +64,9 @@ func run() error { signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) errCh := make(chan error, 1) - go runMode(ctx, cfg, errCh) + go func() { + errCh <- session.Run(ctx, toSessionConfig(cfg)) + }() select { case <-sigCh: @@ -121,41 +104,6 @@ func configureLogging(debug bool) { } } -func validateConfig(cfg config) error { - availableProviders := provider.Available() - validProvider := false - for _, p := range availableProviders { - if cfg.provider == p { - validProvider = true - break - } - } - - availableTransports := transport.Available() - validTransport := false - for _, t := range availableTransports { - if cfg.transport == t { - validTransport = true - break - } - } - - switch { - case cfg.provider == "": - return errProviderRequired - case !validProvider: - return fmt.Errorf("%w: %s (available: %v)", errUnsupportedProvider, cfg.provider, availableProviders) - case !validTransport: - return fmt.Errorf("%w: %s (available: %v)", errUnsupportedTransport, cfg.transport, availableTransports) - case cfg.roomID == "" && cfg.provider != "jazz": - return errRoomIDRequired - case cfg.mode != "srv" && cfg.mode != "cnc": - return errModeRequired - default: - return nil - } -} - func resolveDataDir(dataDir string) (string, error) { if filepath.IsAbs(dataDir) { return dataDir, nil @@ -179,49 +127,18 @@ func loadNames(dataDir string) error { return nil } -func runMode(ctx context.Context, cfg config, errCh chan<- error) { - roomURL := buildRoomURL(cfg.provider, cfg.roomID) - - switch cfg.mode { - case "srv": - errCh <- server.Run( - ctx, - cfg.transport, - cfg.provider, - roomURL, - cfg.keyHex, - cfg.dnsServer, - cfg.socksProxyAddr, - cfg.socksProxyPort, - ) - case "cnc": - errCh <- client.Run( - ctx, - cfg.transport, - cfg.provider, - roomURL, - cfg.keyHex, - fmt.Sprintf("%s:%d", cfg.socksHost, cfg.socksPort), - cfg.dnsServer, - "", - "", - ) - } -} - -func buildRoomURL(providerName, roomID string) string { - switch providerName { - case "telemost": - return "https://telemost.yandex.ru/j/" + roomID - case "jazz": - if roomID == "" { - return "any" - } - return roomID - case "wb_stream": - return roomID - default: - return roomID +func toSessionConfig(cfg config) session.Config { + return session.Config{ + Mode: cfg.mode, + Transport: cfg.transport, + Provider: cfg.provider, + RoomID: cfg.roomID, + KeyHex: cfg.keyHex, + SOCKSHost: cfg.socksHost, + SOCKSPort: cfg.socksPort, + DNSServer: cfg.dnsServer, + SOCKSProxyAddr: cfg.socksProxyAddr, + SOCKSProxyPort: cfg.socksProxyPort, } } diff --git a/internal/app/session/session.go b/internal/app/session/session.go new file mode 100644 index 0000000..f183956 --- /dev/null +++ b/internal/app/session/session.go @@ -0,0 +1,138 @@ +// Package session wires runtime configuration to application mode entrypoints. +package session + +import ( + "context" + "errors" + "fmt" + + "github.com/openlibrecommunity/olcrtc/internal/client" + "github.com/openlibrecommunity/olcrtc/internal/provider" + "github.com/openlibrecommunity/olcrtc/internal/provider/jazz" + "github.com/openlibrecommunity/olcrtc/internal/provider/telemost" + "github.com/openlibrecommunity/olcrtc/internal/provider/wbstream" + "github.com/openlibrecommunity/olcrtc/internal/server" + "github.com/openlibrecommunity/olcrtc/internal/transport" + "github.com/openlibrecommunity/olcrtc/internal/transport/datachannel" +) + +var ( + // ErrRoomIDRequired indicates that a room id is required for the selected provider. + ErrRoomIDRequired = errors.New("room ID required") + // ErrModeRequired indicates that mode is not one of the supported values. + ErrModeRequired = errors.New("specify -mode srv or -mode cnc") + // ErrProviderRequired indicates that no provider was selected. + ErrProviderRequired = errors.New("provider required (use -provider telemost or -provider jazz)") + // ErrUnsupportedProvider indicates that provider is not registered. + ErrUnsupportedProvider = errors.New("unsupported provider") + // ErrUnsupportedTransport indicates that transport is not registered. + ErrUnsupportedTransport = errors.New("unsupported transport") +) + +// Config holds runtime session settings. +type Config struct { + Mode string + Transport string + Provider string + RoomID string + KeyHex string + SOCKSHost string + SOCKSPort int + DNSServer string + SOCKSProxyAddr string + SOCKSProxyPort int +} + +// RegisterDefaults registers built-in providers and transports. +func RegisterDefaults() { + provider.Register("jazz", jazz.New) + provider.Register("telemost", telemost.New) + provider.Register("wb_stream", wbstream.New) + + transport.Register("datachannel", datachannel.New) +} + +// Validate verifies that the runtime config refers to registered components. +func Validate(cfg Config) error { + availableProviders := provider.Available() + validProvider := false + for _, p := range availableProviders { + if cfg.Provider == p { + validProvider = true + break + } + } + + availableTransports := transport.Available() + validTransport := false + for _, t := range availableTransports { + if cfg.Transport == t { + validTransport = true + break + } + } + + switch { + case cfg.Provider == "": + return ErrProviderRequired + case !validProvider: + return fmt.Errorf("%w: %s (available: %v)", ErrUnsupportedProvider, cfg.Provider, availableProviders) + case !validTransport: + return fmt.Errorf("%w: %s (available: %v)", ErrUnsupportedTransport, cfg.Transport, availableTransports) + case cfg.RoomID == "" && cfg.Provider != "jazz": + return ErrRoomIDRequired + case cfg.Mode != "srv" && cfg.Mode != "cnc": + return ErrModeRequired + default: + return nil + } +} + +// Run starts the configured mode. +func Run(ctx context.Context, cfg Config) error { + roomURL := buildRoomURL(cfg.Provider, cfg.RoomID) + + switch cfg.Mode { + case "srv": + return server.Run( + ctx, + cfg.Transport, + cfg.Provider, + roomURL, + cfg.KeyHex, + cfg.DNSServer, + cfg.SOCKSProxyAddr, + cfg.SOCKSProxyPort, + ) + case "cnc": + return client.Run( + ctx, + cfg.Transport, + cfg.Provider, + roomURL, + cfg.KeyHex, + fmt.Sprintf("%s:%d", cfg.SOCKSHost, cfg.SOCKSPort), + cfg.DNSServer, + "", + "", + ) + default: + return ErrModeRequired + } +} + +func buildRoomURL(providerName, roomID string) string { + switch providerName { + case "telemost": + return "https://telemost.yandex.ru/j/" + roomID + case "jazz": + if roomID == "" { + return "any" + } + return roomID + case "wb_stream": + return roomID + default: + return roomID + } +} From 27b94b6692396ccc50c17bee3948eb1b7ec9c91d Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 20 Apr 2026 20:11:02 +0300 Subject: [PATCH 02/10] refactor: introduce carrier facade --- cmd/olcrtc/main.go | 15 ++++++- internal/app/session/session.go | 48 ++++++++++----------- internal/carrier/carrier.go | 32 ++++++++++++++ internal/transport/datachannel/transport.go | 24 +++++------ 4 files changed, 81 insertions(+), 38 deletions(-) create mode 100644 internal/carrier/carrier.go diff --git a/cmd/olcrtc/main.go b/cmd/olcrtc/main.go index 402a8a7..6306a0d 100644 --- a/cmd/olcrtc/main.go +++ b/cmd/olcrtc/main.go @@ -19,6 +19,7 @@ import ( type config struct { mode string transport string + carrier string roomID string provider string socksPort int @@ -83,8 +84,9 @@ func parseFlags() config { flag.StringVar(&cfg.mode, "mode", "", "Mode: srv or cnc") flag.StringVar(&cfg.transport, "transport", "datachannel", "Transport: datachannel") + flag.StringVar(&cfg.carrier, "carrier", "", "Carrier: telemost, jazz, wb_stream") flag.StringVar(&cfg.roomID, "id", "", "Room ID") - flag.StringVar(&cfg.provider, "provider", "", "Provider: telemost or jazz (required)") + flag.StringVar(&cfg.provider, "provider", "", "Deprecated alias for -carrier") flag.IntVar(&cfg.socksPort, "socks-port", 1080, "SOCKS5 port (client only)") flag.StringVar(&cfg.socksHost, "socks-host", "127.0.0.1", "SOCKS5 listen host (client only)") flag.StringVar(&cfg.keyHex, "key", "", "Shared encryption key (hex)") @@ -131,7 +133,7 @@ func toSessionConfig(cfg config) session.Config { return session.Config{ Mode: cfg.mode, Transport: cfg.transport, - Provider: cfg.provider, + Carrier: firstNonEmpty(cfg.carrier, cfg.provider), RoomID: cfg.roomID, KeyHex: cfg.keyHex, SOCKSHost: cfg.socksHost, @@ -142,6 +144,15 @@ func toSessionConfig(cfg config) session.Config { } } +func firstNonEmpty(values ...string) string { + for _, value := range values { + if value != "" { + return value + } + } + return "" +} + func waitForShutdown(errCh <-chan error) error { done := make(chan error, 1) go func() { diff --git a/internal/app/session/session.go b/internal/app/session/session.go index f183956..5685d98 100644 --- a/internal/app/session/session.go +++ b/internal/app/session/session.go @@ -6,8 +6,8 @@ import ( "errors" "fmt" + "github.com/openlibrecommunity/olcrtc/internal/carrier" "github.com/openlibrecommunity/olcrtc/internal/client" - "github.com/openlibrecommunity/olcrtc/internal/provider" "github.com/openlibrecommunity/olcrtc/internal/provider/jazz" "github.com/openlibrecommunity/olcrtc/internal/provider/telemost" "github.com/openlibrecommunity/olcrtc/internal/provider/wbstream" @@ -21,10 +21,10 @@ var ( ErrRoomIDRequired = errors.New("room ID required") // ErrModeRequired indicates that mode is not one of the supported values. ErrModeRequired = errors.New("specify -mode srv or -mode cnc") - // ErrProviderRequired indicates that no provider was selected. - ErrProviderRequired = errors.New("provider required (use -provider telemost or -provider jazz)") - // ErrUnsupportedProvider indicates that provider is not registered. - ErrUnsupportedProvider = errors.New("unsupported provider") + // ErrCarrierRequired indicates that no carrier was selected. + ErrCarrierRequired = errors.New("carrier required (use -carrier telemost or -carrier jazz)") + // ErrUnsupportedCarrier indicates that carrier is not registered. + ErrUnsupportedCarrier = errors.New("unsupported carrier") // ErrUnsupportedTransport indicates that transport is not registered. ErrUnsupportedTransport = errors.New("unsupported transport") ) @@ -33,7 +33,7 @@ var ( type Config struct { Mode string Transport string - Provider string + Carrier string RoomID string KeyHex string SOCKSHost string @@ -45,20 +45,20 @@ type Config struct { // RegisterDefaults registers built-in providers and transports. func RegisterDefaults() { - provider.Register("jazz", jazz.New) - provider.Register("telemost", telemost.New) - provider.Register("wb_stream", wbstream.New) + carrier.Register("jazz", jazz.New) + carrier.Register("telemost", telemost.New) + carrier.Register("wb_stream", wbstream.New) transport.Register("datachannel", datachannel.New) } // Validate verifies that the runtime config refers to registered components. func Validate(cfg Config) error { - availableProviders := provider.Available() - validProvider := false - for _, p := range availableProviders { - if cfg.Provider == p { - validProvider = true + availableCarriers := carrier.Available() + validCarrier := false + for _, c := range availableCarriers { + if cfg.Carrier == c { + validCarrier = true break } } @@ -73,13 +73,13 @@ func Validate(cfg Config) error { } switch { - case cfg.Provider == "": - return ErrProviderRequired - case !validProvider: - return fmt.Errorf("%w: %s (available: %v)", ErrUnsupportedProvider, cfg.Provider, availableProviders) + case cfg.Carrier == "": + return ErrCarrierRequired + case !validCarrier: + return fmt.Errorf("%w: %s (available: %v)", ErrUnsupportedCarrier, cfg.Carrier, availableCarriers) case !validTransport: return fmt.Errorf("%w: %s (available: %v)", ErrUnsupportedTransport, cfg.Transport, availableTransports) - case cfg.RoomID == "" && cfg.Provider != "jazz": + case cfg.RoomID == "" && cfg.Carrier != "jazz": return ErrRoomIDRequired case cfg.Mode != "srv" && cfg.Mode != "cnc": return ErrModeRequired @@ -90,14 +90,14 @@ func Validate(cfg Config) error { // Run starts the configured mode. func Run(ctx context.Context, cfg Config) error { - roomURL := buildRoomURL(cfg.Provider, cfg.RoomID) + roomURL := buildRoomURL(cfg.Carrier, cfg.RoomID) switch cfg.Mode { case "srv": return server.Run( ctx, cfg.Transport, - cfg.Provider, + cfg.Carrier, roomURL, cfg.KeyHex, cfg.DNSServer, @@ -108,7 +108,7 @@ func Run(ctx context.Context, cfg Config) error { return client.Run( ctx, cfg.Transport, - cfg.Provider, + cfg.Carrier, roomURL, cfg.KeyHex, fmt.Sprintf("%s:%d", cfg.SOCKSHost, cfg.SOCKSPort), @@ -121,8 +121,8 @@ func Run(ctx context.Context, cfg Config) error { } } -func buildRoomURL(providerName, roomID string) string { - switch providerName { +func buildRoomURL(carrierName, roomID string) string { + switch carrierName { case "telemost": return "https://telemost.yandex.ru/j/" + roomID case "jazz": diff --git a/internal/carrier/carrier.go b/internal/carrier/carrier.go new file mode 100644 index 0000000..0e0cf9d --- /dev/null +++ b/internal/carrier/carrier.go @@ -0,0 +1,32 @@ +// Package carrier exposes carrier-oriented registration and construction APIs. +package carrier + +import ( + "context" + + "github.com/openlibrecommunity/olcrtc/internal/provider" +) + +// Carrier is the current carrier implementation contract. +type Carrier = provider.Provider + +// Config holds carrier configuration. +type Config = provider.Config + +// Factory creates a new carrier instance. +type Factory = provider.Factory + +// Register adds a carrier factory to the registry. +func Register(name string, factory Factory) { + provider.Register(name, factory) +} + +// New creates a carrier instance by name. +func New(ctx context.Context, name string, cfg Config) (Carrier, error) { + return provider.New(ctx, name, cfg) +} + +// Available returns a list of registered carriers. +func Available() []string { + return provider.Available() +} diff --git a/internal/transport/datachannel/transport.go b/internal/transport/datachannel/transport.go index dfd1b2d..f57e6ca 100644 --- a/internal/transport/datachannel/transport.go +++ b/internal/transport/datachannel/transport.go @@ -5,18 +5,18 @@ import ( "context" "fmt" - "github.com/openlibrecommunity/olcrtc/internal/provider" + "github.com/openlibrecommunity/olcrtc/internal/carrier" "github.com/openlibrecommunity/olcrtc/internal/transport" "github.com/pion/webrtc/v4" ) type providerTransport struct { - provider provider.Provider + carrier carrier.Carrier } // New creates a datachannel transport backed by a carrier-specific provider. func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) { - p, err := provider.New(ctx, cfg.Carrier, provider.Config{ + c, err := carrier.New(ctx, cfg.Carrier, carrier.Config{ RoomURL: cfg.RoomURL, Name: cfg.Name, OnData: cfg.OnData, @@ -28,27 +28,27 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) return nil, fmt.Errorf("create provider transport: %w", err) } - return &providerTransport{provider: p}, nil + return &providerTransport{carrier: c}, nil } // Connect starts the transport connection. func (p *providerTransport) Connect(ctx context.Context) error { - return p.provider.Connect(ctx) + return p.carrier.Connect(ctx) } // Send transmits data through the transport. func (p *providerTransport) Send(data []byte) error { - return p.provider.Send(data) + return p.carrier.Send(data) } // Close terminates the transport. func (p *providerTransport) Close() error { - return p.provider.Close() + return p.carrier.Close() } // SetReconnectCallback registers reconnect handling. func (p *providerTransport) SetReconnectCallback(cb func()) { - p.provider.SetReconnectCallback(func(_ *webrtc.DataChannel) { + p.carrier.SetReconnectCallback(func(_ *webrtc.DataChannel) { if cb != nil { cb() } @@ -57,20 +57,20 @@ func (p *providerTransport) SetReconnectCallback(cb func()) { // SetShouldReconnect configures reconnect policy. func (p *providerTransport) SetShouldReconnect(fn func() bool) { - p.provider.SetShouldReconnect(fn) + p.carrier.SetShouldReconnect(fn) } // SetEndedCallback registers end-of-session handling. func (p *providerTransport) SetEndedCallback(cb func(string)) { - p.provider.SetEndedCallback(cb) + p.carrier.SetEndedCallback(cb) } // WatchConnection monitors connection lifecycle. func (p *providerTransport) WatchConnection(ctx context.Context) { - p.provider.WatchConnection(ctx) + p.carrier.WatchConnection(ctx) } // CanSend reports whether transport is ready for sending. func (p *providerTransport) CanSend() bool { - return p.provider.CanSend() + return p.carrier.CanSend() } From 9bd9503daa329e8a9a15616ca7f4d755e65e3944 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 20 Apr 2026 20:13:49 +0300 Subject: [PATCH 03/10] refactor: add direct link layer --- internal/app/session/session.go | 3 ++ internal/client/client.go | 41 ++++++++++++------------ internal/link/direct/direct.go | 43 ++++++++++++++++++++++++++ internal/link/link.go | 55 +++++++++++++++++++++++++++++++++ internal/server/server.go | 41 ++++++++++++------------ 5 files changed, 143 insertions(+), 40 deletions(-) create mode 100644 internal/link/direct/direct.go create mode 100644 internal/link/link.go diff --git a/internal/app/session/session.go b/internal/app/session/session.go index 5685d98..9127d26 100644 --- a/internal/app/session/session.go +++ b/internal/app/session/session.go @@ -8,6 +8,8 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/carrier" "github.com/openlibrecommunity/olcrtc/internal/client" + "github.com/openlibrecommunity/olcrtc/internal/link" + "github.com/openlibrecommunity/olcrtc/internal/link/direct" "github.com/openlibrecommunity/olcrtc/internal/provider/jazz" "github.com/openlibrecommunity/olcrtc/internal/provider/telemost" "github.com/openlibrecommunity/olcrtc/internal/provider/wbstream" @@ -49,6 +51,7 @@ func RegisterDefaults() { carrier.Register("telemost", telemost.New) carrier.Register("wb_stream", wbstream.New) + link.Register("direct", direct.New) transport.Register("datachannel", datachannel.New) } diff --git a/internal/client/client.go b/internal/client/client.go index 8d57e4a..0aa38a2 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -16,10 +16,10 @@ import ( "time" "github.com/openlibrecommunity/olcrtc/internal/crypto" + "github.com/openlibrecommunity/olcrtc/internal/link" "github.com/openlibrecommunity/olcrtc/internal/logger" "github.com/openlibrecommunity/olcrtc/internal/mux" "github.com/openlibrecommunity/olcrtc/internal/names" - "github.com/openlibrecommunity/olcrtc/internal/transport" ) var ( @@ -43,7 +43,7 @@ var ( // Client handles local SOCKS5 connections and tunnels them via WebRTC. type Client struct { - transports []transport.Transport + links []link.Link cipher *crypto.Cipher mux *mux.Multiplexer connections map[uint16]net.Conn @@ -100,7 +100,7 @@ func RunWithReady( c := &Client{ cipher: cipher, connections: make(map[uint16]net.Conn), - transports: make([]transport.Transport, 0), + links: make([]link.Link, 0), clientID: clientID, dnsServer: dnsServer, } @@ -161,8 +161,8 @@ func (c *Client) setupMux() { c.mux = mux.New(c.clientID, func(frame []byte) error { for { canSend := true - for _, tr := range c.transports { - if !tr.CanSend() { + for _, ln := range c.links { + if !ln.CanSend() { canSend = false break } @@ -177,11 +177,11 @@ func (c *Client) setupMux() { if err != nil { return fmt.Errorf("%w: %w", ErrEncryptFailed, err) } - if len(c.transports) == 0 { + if len(c.links) == 0 { return ErrNoPeers } - idx := c.peerIdx.Add(1) % uint32(len(c.transports)) //nolint:gosec - return c.transports[idx].Send(encrypted) + idx := c.peerIdx.Add(1) % uint32(len(c.links)) //nolint:gosec + return c.links[idx].Send(encrypted) }) } @@ -196,7 +196,8 @@ func (c *Client) addTransport( socksProxyAddr string, socksProxyPort int, ) error { - tr, err := transport.New(ctx, transportName, transport.Config{ + ln, err := link.New(ctx, "direct", link.Config{ + Transport: transportName, Carrier: providerName, RoomURL: roomURL, Name: names.Generate(), @@ -206,21 +207,21 @@ func (c *Client) addTransport( ProxyPort: socksProxyPort, }) if err != nil { - return fmt.Errorf("failed to create transport: %w", err) + return fmt.Errorf("failed to create link: %w", err) } - tr.SetEndedCallback(func(reason string) { + ln.SetEndedCallback(func(reason string) { logger.Infof("Client transport %d reported conference end: %s", peerID, reason) cancel() }) - c.transports = append(c.transports, tr) + c.links = append(c.links, ln) - tr.SetReconnectCallback(func() { + ln.SetReconnectCallback(func() { c.handleTransportReconnect(peerID) }) logger.Infof("Connecting transport %d via %s/%s...", peerID, transportName, providerName) - if err := tr.Connect(ctx); err != nil { + if err := ln.Connect(ctx); err != nil { return fmt.Errorf("failed to connect transport: %w", err) } logger.Infof("Transport %d connected", peerID) @@ -228,7 +229,7 @@ func (c *Client) addTransport( c.wg.Add(1) go func() { defer c.wg.Done() - tr.WatchConnection(ctx) + ln.WatchConnection(ctx) }() // Send initial reset to clean up any stale connections for this clientID on server @@ -256,11 +257,11 @@ func (c *Client) handleTransportReconnect(peerID int) { if err != nil { return fmt.Errorf("%w: %w", ErrEncryptFailed, err) } - if len(c.transports) == 0 { + if len(c.links) == 0 { return ErrNoPeers } - idx := c.peerIdx.Add(1) % uint32(len(c.transports)) //nolint:gosec - return c.transports[idx].Send(encrypted) + idx := c.peerIdx.Add(1) % uint32(len(c.links)) //nolint:gosec + return c.links[idx].Send(encrypted) }) c.mux.Reset() @@ -443,7 +444,7 @@ func (c *Client) shutdown() { } c.connMu.Unlock() - for i, tr := range c.transports { + for i, tr := range c.links { logger.Infof("closing transport %d", i) _ = tr.Close() } @@ -516,7 +517,7 @@ func (c *Client) startStreamPump(ctx context.Context, sid uint16, conn net.Conn) } func (c *Client) canSendData() bool { - for _, tr := range c.transports { + for _, tr := range c.links { if !tr.CanSend() { return false } diff --git a/internal/link/direct/direct.go b/internal/link/direct/direct.go new file mode 100644 index 0000000..40b318b --- /dev/null +++ b/internal/link/direct/direct.go @@ -0,0 +1,43 @@ +// Package direct provides a pass-through link implementation above transports. +package direct + +import ( + "context" + "fmt" + + "github.com/openlibrecommunity/olcrtc/internal/link" + "github.com/openlibrecommunity/olcrtc/internal/transport" +) + +type directLink struct { + transport transport.Transport +} + +// New creates a direct link that forwards bytes to the selected transport. +func New(ctx context.Context, cfg link.Config) (link.Link, error) { + tr, err := transport.New(ctx, cfg.Transport, transport.Config{ + Carrier: cfg.Carrier, + RoomURL: cfg.RoomURL, + Name: cfg.Name, + OnData: cfg.OnData, + DNSServer: cfg.DNSServer, + ProxyAddr: cfg.ProxyAddr, + ProxyPort: cfg.ProxyPort, + }) + if err != nil { + return nil, fmt.Errorf("create transport for direct link: %w", err) + } + + return &directLink{transport: tr}, nil +} + +func (d *directLink) Connect(ctx context.Context) error { return d.transport.Connect(ctx) } +func (d *directLink) Send(data []byte) error { return d.transport.Send(data) } +func (d *directLink) Close() error { return d.transport.Close() } +func (d *directLink) SetReconnectCallback(cb func()) { d.transport.SetReconnectCallback(cb) } +func (d *directLink) SetShouldReconnect(fn func() bool) { d.transport.SetShouldReconnect(fn) } +func (d *directLink) SetEndedCallback(cb func(string)) { d.transport.SetEndedCallback(cb) } +func (d *directLink) WatchConnection(ctx context.Context) { + d.transport.WatchConnection(ctx) +} +func (d *directLink) CanSend() bool { return d.transport.CanSend() } diff --git a/internal/link/link.go b/internal/link/link.go new file mode 100644 index 0000000..7d671c6 --- /dev/null +++ b/internal/link/link.go @@ -0,0 +1,55 @@ +// Package link defines link-layer abstractions above transports. +package link + +import ( + "context" + "errors" +) + +var ( + // ErrLinkNotFound is returned when a requested link is not registered. + ErrLinkNotFound = errors.New("link not found") +) + +// Link defines a byte link above a transport. +type Link interface { + Connect(ctx context.Context) error + Send(data []byte) error + Close() error + SetReconnectCallback(cb func()) + SetShouldReconnect(fn func() bool) + SetEndedCallback(cb func(string)) + WatchConnection(ctx context.Context) + CanSend() bool +} + +// Config holds common link configuration. +type Config struct { + Transport string + Carrier string + RoomURL string + Name string + OnData func([]byte) + DNSServer string + ProxyAddr string + ProxyPort int +} + +// Factory creates a link instance. +type Factory func(ctx context.Context, cfg Config) (Link, error) + +var registry = make(map[string]Factory) + +// Register adds a link factory to the registry. +func Register(name string, factory Factory) { + registry[name] = factory +} + +// New creates a link instance by name. +func New(ctx context.Context, name string, cfg Config) (Link, error) { + factory, ok := registry[name] + if !ok { + return nil, ErrLinkNotFound + } + return factory(ctx, cfg) +} diff --git a/internal/server/server.go b/internal/server/server.go index 0543052..e17c650 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -17,10 +17,10 @@ import ( "time" "github.com/openlibrecommunity/olcrtc/internal/crypto" + "github.com/openlibrecommunity/olcrtc/internal/link" "github.com/openlibrecommunity/olcrtc/internal/logger" "github.com/openlibrecommunity/olcrtc/internal/mux" "github.com/openlibrecommunity/olcrtc/internal/names" - "github.com/openlibrecommunity/olcrtc/internal/transport" ) var ( @@ -42,7 +42,7 @@ var ( // Server handles incoming WebRTC connections and proxies their traffic. type Server struct { - transports []transport.Transport + links []link.Link cipher *crypto.Cipher mux *mux.Multiplexer connections map[uint16]net.Conn @@ -88,7 +88,7 @@ func Run( cipher: cipher, connections: make(map[uint16]net.Conn), streamPumps: make(map[uint16]net.Conn), - transports: make([]transport.Transport, 0), + links: make([]link.Link, 0), dnsServer: dnsServer, socksProxyAddr: socksProxyAddr, socksProxyPort: socksProxyPort, @@ -161,8 +161,8 @@ func (s *Server) setupMux() { s.mux = mux.New(0, func(frame []byte) error { for { canSend := true - for _, tr := range s.transports { - if !tr.CanSend() { + for _, ln := range s.links { + if !ln.CanSend() { canSend = false break } @@ -177,11 +177,11 @@ func (s *Server) setupMux() { if err != nil { return fmt.Errorf("%w: %w", ErrEncryptFailed, err) } - if len(s.transports) == 0 { + if len(s.links) == 0 { return ErrNoPeers } - idx := s.peerIdx.Add(1) % uint32(len(s.transports)) //nolint:gosec - return s.transports[idx].Send(encrypted) + idx := s.peerIdx.Add(1) % uint32(len(s.links)) //nolint:gosec + return s.links[idx].Send(encrypted) }) } @@ -193,7 +193,8 @@ func (s *Server) addTransport( peerID int, cancel context.CancelFunc, ) error { - tr, err := transport.New(ctx, transportName, transport.Config{ + ln, err := link.New(ctx, "direct", link.Config{ + Transport: transportName, Carrier: providerName, RoomURL: roomURL, Name: names.Generate(), @@ -203,21 +204,21 @@ func (s *Server) addTransport( ProxyPort: s.socksProxyPort, }) if err != nil { - return fmt.Errorf("failed to create transport: %w", err) + return fmt.Errorf("failed to create link: %w", err) } - tr.SetEndedCallback(func(reason string) { + ln.SetEndedCallback(func(reason string) { logger.Infof("Server transport %d reported conference end: %s", peerID, reason) cancel() }) - s.transports = append(s.transports, tr) + s.links = append(s.links, ln) - tr.SetReconnectCallback(func() { + ln.SetReconnectCallback(func() { s.handleTransportReconnect(peerID) }) logger.Infof("Connecting transport %d via %s/%s...", peerID, transportName, providerName) - if err := tr.Connect(ctx); err != nil { + if err := ln.Connect(ctx); err != nil { return fmt.Errorf("failed to connect transport: %w", err) } logger.Infof("Transport %d connected", peerID) @@ -225,7 +226,7 @@ func (s *Server) addTransport( s.wg.Add(1) go func() { defer s.wg.Done() - tr.WatchConnection(ctx) + ln.WatchConnection(ctx) }() return nil } @@ -247,11 +248,11 @@ func (s *Server) handleTransportReconnect(peerID int) { if err != nil { return fmt.Errorf("%w: %w", ErrEncryptFailed, err) } - if len(s.transports) == 0 { + if len(s.links) == 0 { return ErrNoPeers } - idx := s.peerIdx.Add(1) % uint32(len(s.transports)) //nolint:gosec - return s.transports[idx].Send(encrypted) + idx := s.peerIdx.Add(1) % uint32(len(s.links)) //nolint:gosec + return s.links[idx].Send(encrypted) }) s.mux.Reset() } @@ -349,7 +350,7 @@ func (s *Server) shutdown() { } s.connMu.Unlock() - for i, tr := range s.transports { + for i, tr := range s.links { logger.Infof("closing transport %d", i) _ = tr.Close() } @@ -561,7 +562,7 @@ func (s *Server) startStreamPump(ctx context.Context, sid uint16, conn net.Conn) } func (s *Server) canSendData() bool { - for _, tr := range s.transports { + for _, tr := range s.links { if !tr.CanSend() { return false } From 033bdcdac53a1e78f835928f83e48c2de58bdd67 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 20 Apr 2026 20:17:04 +0300 Subject: [PATCH 04/10] refactor: add carrier byte stream capability --- internal/carrier/bytestream.go | 47 +++++++++++++++++++++ internal/transport/datachannel/transport.go | 43 +++++++++---------- 2 files changed, 66 insertions(+), 24 deletions(-) create mode 100644 internal/carrier/bytestream.go diff --git a/internal/carrier/bytestream.go b/internal/carrier/bytestream.go new file mode 100644 index 0000000..e7967a4 --- /dev/null +++ b/internal/carrier/bytestream.go @@ -0,0 +1,47 @@ +package carrier + +import ( + "context" + + "github.com/pion/webrtc/v4" +) + +// ByteStream is a carrier capability for bidirectional byte transport. +type ByteStream interface { + Connect(ctx context.Context) error + Send(data []byte) error + Close() error + SetReconnectCallback(cb func()) + SetShouldReconnect(fn func() bool) + SetEndedCallback(cb func(string)) + WatchConnection(ctx context.Context) + CanSend() bool +} + +type providerByteStream struct { + carrier Carrier +} + +// OpenByteStream adapts a carrier to a generic byte-stream capability. +func OpenByteStream(c Carrier) ByteStream { + return &providerByteStream{carrier: c} +} + +func (p *providerByteStream) Connect(ctx context.Context) error { return p.carrier.Connect(ctx) } +func (p *providerByteStream) Send(data []byte) error { return p.carrier.Send(data) } +func (p *providerByteStream) Close() error { return p.carrier.Close() } + +func (p *providerByteStream) SetReconnectCallback(cb func()) { + p.carrier.SetReconnectCallback(func(_ *webrtc.DataChannel) { + if cb != nil { + cb() + } + }) +} + +func (p *providerByteStream) SetShouldReconnect(fn func() bool) { p.carrier.SetShouldReconnect(fn) } +func (p *providerByteStream) SetEndedCallback(cb func(string)) { p.carrier.SetEndedCallback(cb) } +func (p *providerByteStream) WatchConnection(ctx context.Context) { + p.carrier.WatchConnection(ctx) +} +func (p *providerByteStream) CanSend() bool { return p.carrier.CanSend() } diff --git a/internal/transport/datachannel/transport.go b/internal/transport/datachannel/transport.go index f57e6ca..3faa4ca 100644 --- a/internal/transport/datachannel/transport.go +++ b/internal/transport/datachannel/transport.go @@ -7,11 +7,10 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/carrier" "github.com/openlibrecommunity/olcrtc/internal/transport" - "github.com/pion/webrtc/v4" ) -type providerTransport struct { - carrier carrier.Carrier +type streamTransport struct { + stream carrier.ByteStream } // New creates a datachannel transport backed by a carrier-specific provider. @@ -28,49 +27,45 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) return nil, fmt.Errorf("create provider transport: %w", err) } - return &providerTransport{carrier: c}, nil + return &streamTransport{stream: carrier.OpenByteStream(c)}, nil } // Connect starts the transport connection. -func (p *providerTransport) Connect(ctx context.Context) error { - return p.carrier.Connect(ctx) +func (p *streamTransport) Connect(ctx context.Context) error { + return p.stream.Connect(ctx) } // Send transmits data through the transport. -func (p *providerTransport) Send(data []byte) error { - return p.carrier.Send(data) +func (p *streamTransport) Send(data []byte) error { + return p.stream.Send(data) } // Close terminates the transport. -func (p *providerTransport) Close() error { - return p.carrier.Close() +func (p *streamTransport) Close() error { + return p.stream.Close() } // SetReconnectCallback registers reconnect handling. -func (p *providerTransport) SetReconnectCallback(cb func()) { - p.carrier.SetReconnectCallback(func(_ *webrtc.DataChannel) { - if cb != nil { - cb() - } - }) +func (p *streamTransport) SetReconnectCallback(cb func()) { + p.stream.SetReconnectCallback(cb) } // SetShouldReconnect configures reconnect policy. -func (p *providerTransport) SetShouldReconnect(fn func() bool) { - p.carrier.SetShouldReconnect(fn) +func (p *streamTransport) SetShouldReconnect(fn func() bool) { + p.stream.SetShouldReconnect(fn) } // SetEndedCallback registers end-of-session handling. -func (p *providerTransport) SetEndedCallback(cb func(string)) { - p.carrier.SetEndedCallback(cb) +func (p *streamTransport) SetEndedCallback(cb func(string)) { + p.stream.SetEndedCallback(cb) } // WatchConnection monitors connection lifecycle. -func (p *providerTransport) WatchConnection(ctx context.Context) { - p.carrier.WatchConnection(ctx) +func (p *streamTransport) WatchConnection(ctx context.Context) { + p.stream.WatchConnection(ctx) } // CanSend reports whether transport is ready for sending. -func (p *providerTransport) CanSend() bool { - return p.carrier.CanSend() +func (p *streamTransport) CanSend() bool { + return p.stream.CanSend() } From ea249091c45998dedeb829cc2acb07711a44fdad Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 20 Apr 2026 20:18:23 +0300 Subject: [PATCH 05/10] refactor: make link selectable at runtime --- cmd/olcrtc/main.go | 3 +++ internal/app/session/session.go | 16 ++++++++++++++++ internal/client/client.go | 9 ++++++--- internal/link/link.go | 9 +++++++++ internal/server/server.go | 6 ++++-- mobile/mobile.go | 1 + 6 files changed, 39 insertions(+), 5 deletions(-) diff --git a/cmd/olcrtc/main.go b/cmd/olcrtc/main.go index 6306a0d..10b5068 100644 --- a/cmd/olcrtc/main.go +++ b/cmd/olcrtc/main.go @@ -18,6 +18,7 @@ import ( type config struct { mode string + link string transport string carrier string roomID string @@ -83,6 +84,7 @@ func parseFlags() config { cfg := config{} flag.StringVar(&cfg.mode, "mode", "", "Mode: srv or cnc") + flag.StringVar(&cfg.link, "link", "direct", "Link: direct") flag.StringVar(&cfg.transport, "transport", "datachannel", "Transport: datachannel") flag.StringVar(&cfg.carrier, "carrier", "", "Carrier: telemost, jazz, wb_stream") flag.StringVar(&cfg.roomID, "id", "", "Room ID") @@ -132,6 +134,7 @@ func loadNames(dataDir string) error { func toSessionConfig(cfg config) session.Config { return session.Config{ Mode: cfg.mode, + Link: cfg.link, Transport: cfg.transport, Carrier: firstNonEmpty(cfg.carrier, cfg.provider), RoomID: cfg.roomID, diff --git a/internal/app/session/session.go b/internal/app/session/session.go index 9127d26..2023c87 100644 --- a/internal/app/session/session.go +++ b/internal/app/session/session.go @@ -27,6 +27,8 @@ var ( ErrCarrierRequired = errors.New("carrier required (use -carrier telemost or -carrier jazz)") // ErrUnsupportedCarrier indicates that carrier is not registered. ErrUnsupportedCarrier = errors.New("unsupported carrier") + // ErrUnsupportedLink indicates that link is not registered. + ErrUnsupportedLink = errors.New("unsupported link") // ErrUnsupportedTransport indicates that transport is not registered. ErrUnsupportedTransport = errors.New("unsupported transport") ) @@ -34,6 +36,7 @@ var ( // Config holds runtime session settings. type Config struct { Mode string + Link string Transport string Carrier string RoomID string @@ -75,11 +78,22 @@ func Validate(cfg Config) error { } } + availableLinks := link.Available() + validLink := false + for _, l := range availableLinks { + if cfg.Link == l { + validLink = true + break + } + } + switch { case cfg.Carrier == "": return ErrCarrierRequired case !validCarrier: return fmt.Errorf("%w: %s (available: %v)", ErrUnsupportedCarrier, cfg.Carrier, availableCarriers) + case !validLink: + return fmt.Errorf("%w: %s (available: %v)", ErrUnsupportedLink, cfg.Link, availableLinks) case !validTransport: return fmt.Errorf("%w: %s (available: %v)", ErrUnsupportedTransport, cfg.Transport, availableTransports) case cfg.RoomID == "" && cfg.Carrier != "jazz": @@ -99,6 +113,7 @@ func Run(ctx context.Context, cfg Config) error { case "srv": return server.Run( ctx, + cfg.Link, cfg.Transport, cfg.Carrier, roomURL, @@ -110,6 +125,7 @@ func Run(ctx context.Context, cfg Config) error { case "cnc": return client.Run( ctx, + cfg.Link, cfg.Transport, cfg.Carrier, roomURL, diff --git a/internal/client/client.go b/internal/client/client.go index 0aa38a2..a38e564 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -58,6 +58,7 @@ type Client struct { // Run starts the client with the specified parameters. func Run( ctx context.Context, + linkName, transportName, providerName, roomURL, @@ -67,12 +68,13 @@ func Run( socksUser string, socksPass string, ) error { - return RunWithReady(ctx, transportName, providerName, roomURL, keyHex, localAddr, dnsServer, socksUser, socksPass, nil) + return RunWithReady(ctx, linkName, transportName, providerName, roomURL, keyHex, localAddr, dnsServer, socksUser, socksPass, nil) } // RunWithReady is like Run but accepts a callback that is called when the client is ready. func RunWithReady( ctx context.Context, + linkName, transportName, providerName, roomURL, @@ -109,7 +111,7 @@ func RunWithReady( const peerCount = 1 for i := range peerCount { - if err := c.addTransport(runCtx, transportName, providerName, roomURL, i, cancel, dnsServer, "", 0); err != nil { + if err := c.addTransport(runCtx, linkName, transportName, providerName, roomURL, i, cancel, dnsServer, "", 0); err != nil { return fmt.Errorf("addTransport failed: %w", err) } } @@ -187,6 +189,7 @@ func (c *Client) setupMux() { func (c *Client) addTransport( ctx context.Context, + linkName, transportName, providerName, roomURL string, @@ -196,7 +199,7 @@ func (c *Client) addTransport( socksProxyAddr string, socksProxyPort int, ) error { - ln, err := link.New(ctx, "direct", link.Config{ + ln, err := link.New(ctx, linkName, link.Config{ Transport: transportName, Carrier: providerName, RoomURL: roomURL, diff --git a/internal/link/link.go b/internal/link/link.go index 7d671c6..bb86890 100644 --- a/internal/link/link.go +++ b/internal/link/link.go @@ -53,3 +53,12 @@ func New(ctx context.Context, name string, cfg Config) (Link, error) { } return factory(ctx, cfg) } + +// Available returns a list of registered link names. +func Available() []string { + names := make([]string, 0, len(registry)) + for name := range registry { + names = append(names, name) + } + return names +} diff --git a/internal/server/server.go b/internal/server/server.go index e17c650..dc6b304 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -68,6 +68,7 @@ type ConnectRequest struct { // Run starts the server with the specified parameters. func Run( ctx context.Context, + linkName, transportName, providerName, roomURL, @@ -103,7 +104,7 @@ func Run( const peerCount = 1 for i := range peerCount { - if err := s.addTransport(runCtx, transportName, providerName, roomURL, i, cancel); err != nil { + if err := s.addTransport(runCtx, linkName, transportName, providerName, roomURL, i, cancel); err != nil { return fmt.Errorf("addTransport failed: %w", err) } } @@ -187,13 +188,14 @@ func (s *Server) setupMux() { func (s *Server) addTransport( ctx context.Context, + linkName, transportName, providerName, roomURL string, peerID int, cancel context.CancelFunc, ) error { - ln, err := link.New(ctx, "direct", link.Config{ + ln, err := link.New(ctx, linkName, link.Config{ Transport: transportName, Carrier: providerName, RoomURL: roomURL, diff --git a/mobile/mobile.go b/mobile/mobile.go index 3032534..9fc0bd9 100644 --- a/mobile/mobile.go +++ b/mobile/mobile.go @@ -109,6 +109,7 @@ func Start(roomID, keyHex string, socksPort int, socksUser, socksPass string) er err := client.RunWithReady( ctx, + "direct", "datachannel", "telemost", roomURL, From 2cad1b0e87f59d26f71bd26da5e973d9b105e913 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 20 Apr 2026 20:20:02 +0300 Subject: [PATCH 06/10] refactor: align tunnel runtime terminology --- internal/client/client.go | 54 +++++++++++++++++++-------------------- internal/server/server.go | 50 ++++++++++++++++++------------------ 2 files changed, 52 insertions(+), 52 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index a38e564..5e30e4e 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -29,8 +29,8 @@ var ( ErrKeyStringLength = errors.New("key string length must be 32") // ErrInvalidSocks5 is returned when the SOCKS version is not 5. ErrInvalidSocks5 = errors.New("invalid SOCKS5 version") - // ErrNoPeers is returned when no peers are available for sending. - ErrNoPeers = errors.New("no peers available") + // ErrNoLinks is returned when no links are available for sending. + ErrNoLinks = errors.New("no links available") // ErrEncryptFailed is returned when encryption fails. ErrEncryptFailed = errors.New("encrypt failed") // ErrUnsupportedSocksCommand is returned when a SOCKS5 command is not supported. @@ -41,14 +41,14 @@ var ( ErrTunnelSetupFailed = errors.New("tunnel setup failed") ) -// Client handles local SOCKS5 connections and tunnels them via WebRTC. +// Client handles local SOCKS5 connections and tunnels them through the selected runtime stack. type Client struct { links []link.Link cipher *crypto.Cipher mux *mux.Multiplexer connections map[uint16]net.Conn connMu sync.RWMutex - peerIdx atomic.Uint32 + linkIdx atomic.Uint32 clientID uint32 activeClients atomic.Int32 wg sync.WaitGroup @@ -60,7 +60,7 @@ func Run( ctx context.Context, linkName, transportName, - providerName, + carrierName, roomURL, keyHex string, localAddr string, @@ -68,7 +68,7 @@ func Run( socksUser string, socksPass string, ) error { - return RunWithReady(ctx, linkName, transportName, providerName, roomURL, keyHex, localAddr, dnsServer, socksUser, socksPass, nil) + return RunWithReady(ctx, linkName, transportName, carrierName, roomURL, keyHex, localAddr, dnsServer, socksUser, socksPass, nil) } // RunWithReady is like Run but accepts a callback that is called when the client is ready. @@ -76,7 +76,7 @@ func RunWithReady( ctx context.Context, linkName, transportName, - providerName, + carrierName, roomURL, keyHex string, localAddr string, @@ -109,10 +109,10 @@ func RunWithReady( c.setupMux() - const peerCount = 1 - for i := range peerCount { - if err := c.addTransport(runCtx, linkName, transportName, providerName, roomURL, i, cancel, dnsServer, "", 0); err != nil { - return fmt.Errorf("addTransport failed: %w", err) + const linkCount = 1 + for i := range linkCount { + if err := c.addLink(runCtx, linkName, transportName, carrierName, roomURL, i, cancel, dnsServer, "", 0); err != nil { + return fmt.Errorf("addLink failed: %w", err) } } @@ -180,20 +180,20 @@ func (c *Client) setupMux() { return fmt.Errorf("%w: %w", ErrEncryptFailed, err) } if len(c.links) == 0 { - return ErrNoPeers + return ErrNoLinks } - idx := c.peerIdx.Add(1) % uint32(len(c.links)) //nolint:gosec + idx := c.linkIdx.Add(1) % uint32(len(c.links)) //nolint:gosec return c.links[idx].Send(encrypted) }) } -func (c *Client) addTransport( +func (c *Client) addLink( ctx context.Context, linkName, transportName, - providerName, + carrierName, roomURL string, - peerID int, + linkID int, cancel context.CancelFunc, dnsServer, socksProxyAddr string, @@ -201,7 +201,7 @@ func (c *Client) addTransport( ) error { ln, err := link.New(ctx, linkName, link.Config{ Transport: transportName, - Carrier: providerName, + Carrier: carrierName, RoomURL: roomURL, Name: names.Generate(), OnData: c.onData, @@ -214,20 +214,20 @@ func (c *Client) addTransport( } ln.SetEndedCallback(func(reason string) { - logger.Infof("Client transport %d reported conference end: %s", peerID, reason) + logger.Infof("Client link %d reported conference end: %s", linkID, reason) cancel() }) c.links = append(c.links, ln) ln.SetReconnectCallback(func() { - c.handleTransportReconnect(peerID) + c.handleLinkReconnect(linkID) }) - logger.Infof("Connecting transport %d via %s/%s...", peerID, transportName, providerName) + logger.Infof("Connecting link %d via %s/%s/%s...", linkID, linkName, transportName, carrierName) if err := ln.Connect(ctx); err != nil { - return fmt.Errorf("failed to connect transport: %w", err) + return fmt.Errorf("failed to connect link: %w", err) } - logger.Infof("Transport %d connected", peerID) + logger.Infof("Link %d connected", linkID) c.wg.Add(1) go func() { @@ -243,8 +243,8 @@ func (c *Client) addTransport( return nil } -func (c *Client) handleTransportReconnect(peerID int) { - logger.Infof("transport %d reconnect event", peerID) +func (c *Client) handleLinkReconnect(linkID int) { + logger.Infof("link %d reconnect event", linkID) c.connMu.Lock() for sid, conn := range c.connections { @@ -261,9 +261,9 @@ func (c *Client) handleTransportReconnect(peerID int) { return fmt.Errorf("%w: %w", ErrEncryptFailed, err) } if len(c.links) == 0 { - return ErrNoPeers + return ErrNoLinks } - idx := c.peerIdx.Add(1) % uint32(len(c.links)) //nolint:gosec + idx := c.linkIdx.Add(1) % uint32(len(c.links)) //nolint:gosec return c.links[idx].Send(encrypted) }) c.mux.Reset() @@ -448,7 +448,7 @@ func (c *Client) shutdown() { c.connMu.Unlock() for i, tr := range c.links { - logger.Infof("closing transport %d", i) + logger.Infof("closing link %d", i) _ = tr.Close() } } diff --git a/internal/server/server.go b/internal/server/server.go index dc6b304..6a8e25c 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -32,15 +32,15 @@ var ( ErrSocks5AuthFailed = errors.New("SOCKS5 auth failed") // ErrSocks5ConnectFailed is returned when SOCKS5 connection fails. ErrSocks5ConnectFailed = errors.New("SOCKS5 connect failed") - // ErrNoPeers is returned when no peers are available. - ErrNoPeers = errors.New("no peers available") + // ErrNoLinks is returned when no links are available. + ErrNoLinks = errors.New("no links available") // ErrDialProxy is returned when dialing the proxy fails. ErrDialProxy = errors.New("failed to dial proxy") // ErrEncryptFailed is returned when encryption fails. ErrEncryptFailed = errors.New("encrypt failed") ) -// Server handles incoming WebRTC connections and proxies their traffic. +// Server handles incoming tunnel connections and proxies their traffic. type Server struct { links []link.Link cipher *crypto.Cipher @@ -49,7 +49,7 @@ type Server struct { connMu sync.RWMutex streamPumps map[uint16]net.Conn pumpMu sync.Mutex - peerIdx atomic.Uint32 + linkIdx atomic.Uint32 activeClients atomic.Int32 wg sync.WaitGroup dnsServer string @@ -70,7 +70,7 @@ func Run( ctx context.Context, linkName, transportName, - providerName, + carrierName, roomURL, keyHex string, dnsServer, @@ -102,10 +102,10 @@ func Run( s.setupResolver() s.setupMux() - const peerCount = 1 - for i := range peerCount { - if err := s.addTransport(runCtx, linkName, transportName, providerName, roomURL, i, cancel); err != nil { - return fmt.Errorf("addTransport failed: %w", err) + const linkCount = 1 + for i := range linkCount { + if err := s.addLink(runCtx, linkName, transportName, carrierName, roomURL, i, cancel); err != nil { + return fmt.Errorf("addLink failed: %w", err) } } @@ -179,25 +179,25 @@ func (s *Server) setupMux() { return fmt.Errorf("%w: %w", ErrEncryptFailed, err) } if len(s.links) == 0 { - return ErrNoPeers + return ErrNoLinks } - idx := s.peerIdx.Add(1) % uint32(len(s.links)) //nolint:gosec + idx := s.linkIdx.Add(1) % uint32(len(s.links)) //nolint:gosec return s.links[idx].Send(encrypted) }) } -func (s *Server) addTransport( +func (s *Server) addLink( ctx context.Context, linkName, transportName, - providerName, + carrierName, roomURL string, - peerID int, + linkID int, cancel context.CancelFunc, ) error { ln, err := link.New(ctx, linkName, link.Config{ Transport: transportName, - Carrier: providerName, + Carrier: carrierName, RoomURL: roomURL, Name: names.Generate(), OnData: s.onData, @@ -210,20 +210,20 @@ func (s *Server) addTransport( } ln.SetEndedCallback(func(reason string) { - logger.Infof("Server transport %d reported conference end: %s", peerID, reason) + logger.Infof("Server link %d reported conference end: %s", linkID, reason) cancel() }) s.links = append(s.links, ln) ln.SetReconnectCallback(func() { - s.handleTransportReconnect(peerID) + s.handleLinkReconnect(linkID) }) - logger.Infof("Connecting transport %d via %s/%s...", peerID, transportName, providerName) + logger.Infof("Connecting link %d via %s/%s/%s...", linkID, linkName, transportName, carrierName) if err := ln.Connect(ctx); err != nil { - return fmt.Errorf("failed to connect transport: %w", err) + return fmt.Errorf("failed to connect link: %w", err) } - logger.Infof("Transport %d connected", peerID) + logger.Infof("Link %d connected", linkID) s.wg.Add(1) go func() { @@ -233,8 +233,8 @@ func (s *Server) addTransport( return nil } -func (s *Server) handleTransportReconnect(peerID int) { - logger.Infof("transport %d reconnect event", peerID) +func (s *Server) handleLinkReconnect(linkID int) { + logger.Infof("link %d reconnect event", linkID) s.connMu.Lock() for sid, conn := range s.connections { @@ -251,9 +251,9 @@ func (s *Server) handleTransportReconnect(peerID int) { return fmt.Errorf("%w: %w", ErrEncryptFailed, err) } if len(s.links) == 0 { - return ErrNoPeers + return ErrNoLinks } - idx := s.peerIdx.Add(1) % uint32(len(s.links)) //nolint:gosec + idx := s.linkIdx.Add(1) % uint32(len(s.links)) //nolint:gosec return s.links[idx].Send(encrypted) }) s.mux.Reset() @@ -353,7 +353,7 @@ func (s *Server) shutdown() { s.connMu.Unlock() for i, tr := range s.links { - logger.Infof("closing transport %d", i) + logger.Infof("closing link %d", i) _ = tr.Close() } } From eac53946fc157d3742e34fd0321c21c27699a1db Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 20 Apr 2026 20:22:58 +0300 Subject: [PATCH 07/10] refactor: decouple carrier registry from provider --- internal/app/session/session.go | 6 +- internal/carrier/bytestream.go | 40 ++++++----- internal/carrier/carrier.go | 75 ++++++++++++++++++--- internal/transport/datachannel/transport.go | 14 +++- 4 files changed, 105 insertions(+), 30 deletions(-) diff --git a/internal/app/session/session.go b/internal/app/session/session.go index 2023c87..c73cdd5 100644 --- a/internal/app/session/session.go +++ b/internal/app/session/session.go @@ -50,9 +50,9 @@ type Config struct { // RegisterDefaults registers built-in providers and transports. func RegisterDefaults() { - carrier.Register("jazz", jazz.New) - carrier.Register("telemost", telemost.New) - carrier.Register("wb_stream", wbstream.New) + carrier.RegisterLegacy("jazz", jazz.New) + carrier.RegisterLegacy("telemost", telemost.New) + carrier.RegisterLegacy("wb_stream", wbstream.New) link.Register("direct", direct.New) transport.Register("datachannel", datachannel.New) diff --git a/internal/carrier/bytestream.go b/internal/carrier/bytestream.go index e7967a4..15cc096 100644 --- a/internal/carrier/bytestream.go +++ b/internal/carrier/bytestream.go @@ -3,6 +3,7 @@ package carrier import ( "context" + "github.com/openlibrecommunity/olcrtc/internal/provider" "github.com/pion/webrtc/v4" ) @@ -18,30 +19,39 @@ type ByteStream interface { CanSend() bool } -type providerByteStream struct { - carrier Carrier +type legacySession struct { + provider provider.Provider } -// OpenByteStream adapts a carrier to a generic byte-stream capability. -func OpenByteStream(c Carrier) ByteStream { - return &providerByteStream{carrier: c} +// Capabilities reports the transport primitives supported by the legacy carrier. +func (s *legacySession) Capabilities() Capabilities { + return Capabilities{ByteStream: true} } -func (p *providerByteStream) Connect(ctx context.Context) error { return p.carrier.Connect(ctx) } -func (p *providerByteStream) Send(data []byte) error { return p.carrier.Send(data) } -func (p *providerByteStream) Close() error { return p.carrier.Close() } +// OpenByteStream adapts the legacy provider to a generic byte stream capability. +func (s *legacySession) OpenByteStream() (ByteStream, error) { + return &legacyByteStream{provider: s.provider}, nil +} -func (p *providerByteStream) SetReconnectCallback(cb func()) { - p.carrier.SetReconnectCallback(func(_ *webrtc.DataChannel) { +type legacyByteStream struct { + provider provider.Provider +} + +func (p *legacyByteStream) Connect(ctx context.Context) error { return p.provider.Connect(ctx) } +func (p *legacyByteStream) Send(data []byte) error { return p.provider.Send(data) } +func (p *legacyByteStream) Close() error { return p.provider.Close() } + +func (p *legacyByteStream) SetReconnectCallback(cb func()) { + p.provider.SetReconnectCallback(func(_ *webrtc.DataChannel) { if cb != nil { cb() } }) } -func (p *providerByteStream) SetShouldReconnect(fn func() bool) { p.carrier.SetShouldReconnect(fn) } -func (p *providerByteStream) SetEndedCallback(cb func(string)) { p.carrier.SetEndedCallback(cb) } -func (p *providerByteStream) WatchConnection(ctx context.Context) { - p.carrier.WatchConnection(ctx) +func (p *legacyByteStream) SetShouldReconnect(fn func() bool) { p.provider.SetShouldReconnect(fn) } +func (p *legacyByteStream) SetEndedCallback(cb func(string)) { p.provider.SetEndedCallback(cb) } +func (p *legacyByteStream) WatchConnection(ctx context.Context) { + p.provider.WatchConnection(ctx) } -func (p *providerByteStream) CanSend() bool { return p.carrier.CanSend() } +func (p *legacyByteStream) CanSend() bool { return p.provider.CanSend() } diff --git a/internal/carrier/carrier.go b/internal/carrier/carrier.go index 0e0cf9d..8c6c30e 100644 --- a/internal/carrier/carrier.go +++ b/internal/carrier/carrier.go @@ -3,30 +3,85 @@ package carrier import ( "context" + "errors" "github.com/openlibrecommunity/olcrtc/internal/provider" ) -// Carrier is the current carrier implementation contract. -type Carrier = provider.Provider +var ( + // ErrCarrierNotFound is returned when a requested carrier is not registered. + ErrCarrierNotFound = errors.New("carrier not found") + // ErrByteStreamUnsupported is returned when a carrier cannot provide a byte stream. + ErrByteStreamUnsupported = errors.New("carrier does not support byte stream") +) + +// Capabilities describes the transport primitives a carrier can expose. +type Capabilities struct { + ByteStream bool +} + +// Session is the carrier-level runtime handle. +type Session interface { + Capabilities() Capabilities +} + +// ByteStreamCapable is implemented by carriers that can expose a byte stream. +type ByteStreamCapable interface { + OpenByteStream() (ByteStream, error) +} // Config holds carrier configuration. -type Config = provider.Config +type Config struct { + RoomURL string + Name string + OnData func([]byte) + DNSServer string + ProxyAddr string + ProxyPort int +} -// Factory creates a new carrier instance. -type Factory = provider.Factory +// Factory creates a new carrier session. +type Factory func(ctx context.Context, cfg Config) (Session, error) + +var registry = make(map[string]Factory) // Register adds a carrier factory to the registry. func Register(name string, factory Factory) { - provider.Register(name, factory) + registry[name] = factory } -// New creates a carrier instance by name. -func New(ctx context.Context, name string, cfg Config) (Carrier, error) { - return provider.New(ctx, name, cfg) +// RegisterLegacy adapts an existing provider factory into the carrier registry. +func RegisterLegacy(name string, factory provider.Factory) { + Register(name, func(ctx context.Context, cfg Config) (Session, error) { + legacy, err := factory(ctx, provider.Config{ + RoomURL: cfg.RoomURL, + Name: cfg.Name, + OnData: cfg.OnData, + DNSServer: cfg.DNSServer, + ProxyAddr: cfg.ProxyAddr, + ProxyPort: cfg.ProxyPort, + }) + if err != nil { + return nil, err + } + return &legacySession{provider: legacy}, nil + }) +} + +// New creates a carrier session by name. +func New(ctx context.Context, name string, cfg Config) (Session, error) { + factory, ok := registry[name] + if !ok { + return nil, ErrCarrierNotFound + } + return factory(ctx, cfg) } // Available returns a list of registered carriers. func Available() []string { - return provider.Available() + names := make([]string, 0, len(registry)) + for name := range registry { + names = append(names, name) + } + return names } diff --git a/internal/transport/datachannel/transport.go b/internal/transport/datachannel/transport.go index 3faa4ca..504f957 100644 --- a/internal/transport/datachannel/transport.go +++ b/internal/transport/datachannel/transport.go @@ -15,7 +15,7 @@ type streamTransport struct { // New creates a datachannel transport backed by a carrier-specific provider. func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) { - c, err := carrier.New(ctx, cfg.Carrier, carrier.Config{ + session, err := carrier.New(ctx, cfg.Carrier, carrier.Config{ RoomURL: cfg.RoomURL, Name: cfg.Name, OnData: cfg.OnData, @@ -27,7 +27,17 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) return nil, fmt.Errorf("create provider transport: %w", err) } - return &streamTransport{stream: carrier.OpenByteStream(c)}, nil + streamCapable, ok := session.(carrier.ByteStreamCapable) + if !ok { + return nil, carrier.ErrByteStreamUnsupported + } + + stream, err := streamCapable.OpenByteStream() + if err != nil { + return nil, fmt.Errorf("open byte stream: %w", err) + } + + return &streamTransport{stream: stream}, nil } // Connect starts the transport connection. From d3b2cc4e67f24c4df38d5b355bae2beeb2a0446a Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 20 Apr 2026 20:24:12 +0300 Subject: [PATCH 08/10] refactor: move builtin carrier registration behind carrier layer --- internal/app/session/session.go | 11 +++-------- internal/carrier/builtin/register.go | 16 ++++++++++++++++ 2 files changed, 19 insertions(+), 8 deletions(-) create mode 100644 internal/carrier/builtin/register.go diff --git a/internal/app/session/session.go b/internal/app/session/session.go index c73cdd5..3f3e4dd 100644 --- a/internal/app/session/session.go +++ b/internal/app/session/session.go @@ -7,19 +7,17 @@ import ( "fmt" "github.com/openlibrecommunity/olcrtc/internal/carrier" + "github.com/openlibrecommunity/olcrtc/internal/carrier/builtin" "github.com/openlibrecommunity/olcrtc/internal/client" "github.com/openlibrecommunity/olcrtc/internal/link" "github.com/openlibrecommunity/olcrtc/internal/link/direct" - "github.com/openlibrecommunity/olcrtc/internal/provider/jazz" - "github.com/openlibrecommunity/olcrtc/internal/provider/telemost" - "github.com/openlibrecommunity/olcrtc/internal/provider/wbstream" "github.com/openlibrecommunity/olcrtc/internal/server" "github.com/openlibrecommunity/olcrtc/internal/transport" "github.com/openlibrecommunity/olcrtc/internal/transport/datachannel" ) var ( - // ErrRoomIDRequired indicates that a room id is required for the selected provider. + // ErrRoomIDRequired indicates that a room id is required for the selected carrier. ErrRoomIDRequired = errors.New("room ID required") // ErrModeRequired indicates that mode is not one of the supported values. ErrModeRequired = errors.New("specify -mode srv or -mode cnc") @@ -50,10 +48,7 @@ type Config struct { // RegisterDefaults registers built-in providers and transports. func RegisterDefaults() { - carrier.RegisterLegacy("jazz", jazz.New) - carrier.RegisterLegacy("telemost", telemost.New) - carrier.RegisterLegacy("wb_stream", wbstream.New) - + builtin.Register() link.Register("direct", direct.New) transport.Register("datachannel", datachannel.New) } diff --git a/internal/carrier/builtin/register.go b/internal/carrier/builtin/register.go new file mode 100644 index 0000000..dd57026 --- /dev/null +++ b/internal/carrier/builtin/register.go @@ -0,0 +1,16 @@ +// Package builtin registers the built-in carrier implementations. +package builtin + +import ( + "github.com/openlibrecommunity/olcrtc/internal/carrier" + "github.com/openlibrecommunity/olcrtc/internal/provider/jazz" + "github.com/openlibrecommunity/olcrtc/internal/provider/telemost" + "github.com/openlibrecommunity/olcrtc/internal/provider/wbstream" +) + +// Register wires the built-in legacy carriers into the carrier registry. +func Register() { + carrier.RegisterLegacy("jazz", jazz.New) + carrier.RegisterLegacy("telemost", telemost.New) + carrier.RegisterLegacy("wb_stream", wbstream.New) +} From 0e4dea928a4f0dd5b8276f0ec932e74f9ec84e78 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 20 Apr 2026 20:25:43 +0300 Subject: [PATCH 09/10] refactor: describe transport delivery features --- internal/transport/datachannel/transport.go | 12 ++++++++++++ internal/transport/transport.go | 9 +++++++++ 2 files changed, 21 insertions(+) diff --git a/internal/transport/datachannel/transport.go b/internal/transport/datachannel/transport.go index 504f957..8b61848 100644 --- a/internal/transport/datachannel/transport.go +++ b/internal/transport/datachannel/transport.go @@ -9,6 +9,8 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/transport" ) +const defaultMaxPayloadSize = 12 * 1024 + type streamTransport struct { stream carrier.ByteStream } @@ -79,3 +81,13 @@ func (p *streamTransport) WatchConnection(ctx context.Context) { func (p *streamTransport) CanSend() bool { return p.stream.CanSend() } + +// Features describes the current datachannel transport semantics. +func (p *streamTransport) Features() transport.Features { + return transport.Features{ + Reliable: true, + Ordered: true, + MessageOriented: true, + MaxPayloadSize: defaultMaxPayloadSize, + } +} diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 012b3c5..74c8c4c 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -11,6 +11,14 @@ var ( ErrTransportNotFound = errors.New("transport not found") ) +// Features describes the delivery semantics of a transport. +type Features struct { + Reliable bool + Ordered bool + MessageOriented bool + MaxPayloadSize int +} + // Transport defines a byte transport independent of the underlying carrier. type Transport interface { Connect(ctx context.Context) error @@ -21,6 +29,7 @@ type Transport interface { SetEndedCallback(cb func(string)) WatchConnection(ctx context.Context) CanSend() bool + Features() Features } // Config holds common transport configuration. From 2a3a7bb9c3c20886aa95a2e0447f9c403e12bdce Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 20 Apr 2026 20:39:34 +0300 Subject: [PATCH 10/10] feat: add VideoTrack capability to provider and carrier interfaces --- internal/carrier/bytestream.go | 26 +++++++++++++++++++++++++- internal/carrier/carrier.go | 8 ++++++++ internal/provider/provider.go | 4 +++- 3 files changed, 36 insertions(+), 2 deletions(-) diff --git a/internal/carrier/bytestream.go b/internal/carrier/bytestream.go index 15cc096..02a584d 100644 --- a/internal/carrier/bytestream.go +++ b/internal/carrier/bytestream.go @@ -19,13 +19,20 @@ type ByteStream interface { CanSend() bool } +// VideoTrack is a carrier capability for publishing a local video track. +type VideoTrack interface { + AddTrack(track *webrtc.TrackLocalStaticRTP) (*webrtc.RTPSender, error) +} + type legacySession struct { provider provider.Provider } // Capabilities reports the transport primitives supported by the legacy carrier. func (s *legacySession) Capabilities() Capabilities { - return Capabilities{ByteStream: true} + caps := Capabilities{ByteStream: true} + _, caps.VideoTrack = s.provider.(provider.VideoTrackCapable) + return caps } // OpenByteStream adapts the legacy provider to a generic byte stream capability. @@ -33,6 +40,15 @@ func (s *legacySession) OpenByteStream() (ByteStream, error) { return &legacyByteStream{provider: s.provider}, nil } +// OpenVideoTrack adapts a legacy provider to the generic video track capability. +func (s *legacySession) OpenVideoTrack() (VideoTrack, error) { + publisher, ok := s.provider.(provider.VideoTrackCapable) + if !ok { + return nil, ErrVideoTrackUnsupported + } + return &legacyVideoTrack{provider: publisher}, nil +} + type legacyByteStream struct { provider provider.Provider } @@ -55,3 +71,11 @@ func (p *legacyByteStream) WatchConnection(ctx context.Context) { p.provider.WatchConnection(ctx) } func (p *legacyByteStream) CanSend() bool { return p.provider.CanSend() } + +type legacyVideoTrack struct { + provider provider.VideoTrackCapable +} + +func (v *legacyVideoTrack) AddTrack(track *webrtc.TrackLocalStaticRTP) (*webrtc.RTPSender, error) { + return v.provider.AddVideoTrack(track) +} diff --git a/internal/carrier/carrier.go b/internal/carrier/carrier.go index 8c6c30e..cbc8e38 100644 --- a/internal/carrier/carrier.go +++ b/internal/carrier/carrier.go @@ -13,11 +13,14 @@ var ( ErrCarrierNotFound = errors.New("carrier not found") // ErrByteStreamUnsupported is returned when a carrier cannot provide a byte stream. ErrByteStreamUnsupported = errors.New("carrier does not support byte stream") + // ErrVideoTrackUnsupported is returned when a carrier cannot publish video tracks. + ErrVideoTrackUnsupported = errors.New("carrier does not support video tracks") ) // Capabilities describes the transport primitives a carrier can expose. type Capabilities struct { ByteStream bool + VideoTrack bool } // Session is the carrier-level runtime handle. @@ -30,6 +33,11 @@ type ByteStreamCapable interface { OpenByteStream() (ByteStream, error) } +// VideoTrackCapable is implemented by carriers that can publish video tracks. +type VideoTrackCapable interface { + OpenVideoTrack() (VideoTrack, error) +} + // Config holds carrier configuration. type Config struct { RoomURL string diff --git a/internal/provider/provider.go b/internal/provider/provider.go index 09265ed..bbc7497 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -35,8 +35,10 @@ type Provider interface { CanSend() bool GetSendQueue() chan []byte GetBufferedAmount() uint64 +} - // AddVideoTrack adds a video track to the connection. +// VideoTrackCapable is implemented by providers that can publish video tracks. +type VideoTrackCapable interface { AddVideoTrack(track *webrtc.TrackLocalStaticRTP) (*webrtc.RTPSender, error) }