From f903bc15d2468a3253a6c6e6bb4aede251c5597b Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 20 Apr 2026 05:22:36 +0300 Subject: [PATCH 01/10] refactor(mux): replace time.Sleep with sync.Cond and clean up code --- internal/mux/mux.go | 103 +++++++++++++++++++++++++++----------------- 1 file changed, 63 insertions(+), 40 deletions(-) diff --git a/internal/mux/mux.go b/internal/mux/mux.go index b575fd7..f7f6b51 100644 --- a/internal/mux/mux.go +++ b/internal/mux/mux.go @@ -6,28 +6,35 @@ import ( "errors" "fmt" "sync" - "time" "github.com/openlibrecommunity/olcrtc/internal/logger" ) var ( - ErrClientResetID = errors.New("client reset requires a non-zero client id") //nolint:revive + ErrClientResetID = errors.New("client reset requires a non-zero client id") ) const ( - ControlStreamID uint16 = 0xFFFF //nolint:revive - ControlLength uint16 = 0xFFFF //nolint:revive + // Frame Header sizes + HeaderSize = 12 + // Special Stream IDs + ControlStreamID uint16 = 0xFFFF + + // Control Frame Types ControlResetClient uint32 = 1 + + // Frame Types (Internal to mux logic) + FrameTypeData uint16 = 0 + FrameTypeControl uint16 = 0xFFFF ) -type ControlFrame struct { //nolint:revive +type ControlFrame struct { ClientID uint32 Type uint32 } -type Stream struct { //nolint:revive +type Stream struct { ID uint16 ClientID uint32 recvBuf []byte @@ -37,13 +44,13 @@ type Stream struct { //nolint:revive outOfOrder map[uint32][]byte } -func (s *Stream) RecvBuf() []byte { //nolint:revive +func (s *Stream) RecvBuf() []byte { s.mu.Lock() defer s.mu.Unlock() return s.recvBuf } -type Multiplexer struct { //nolint:revive +type Multiplexer struct { streams map[uint16]*Stream nextID uint16 clientID uint32 @@ -55,10 +62,13 @@ type Multiplexer struct { //nolint:revive dataReadyMu sync.Mutex sendSeq map[uint16]uint32 sendSeqMu sync.Mutex + + // bufferCond is used to wait for space in receive buffers + bufferCond *sync.Cond } -func New(clientID uint32, onSend func([]byte) error) *Multiplexer { //nolint:revive - return &Multiplexer{ +func New(clientID uint32, onSend func([]byte) error) *Multiplexer { + m := &Multiplexer{ streams: make(map[uint16]*Stream), nextID: 1, clientID: clientID, @@ -68,9 +78,11 @@ func New(clientID uint32, onSend func([]byte) error) *Multiplexer { //nolint:rev dataReady: make(map[uint16]chan struct{}), sendSeq: make(map[uint16]uint32), } + m.bufferCond = sync.NewCond(&m.mu) + return m } -func (m *Multiplexer) OpenStream() uint16 { //nolint:revive +func (m *Multiplexer) OpenStream() uint16 { m.mu.Lock() defer m.mu.Unlock() @@ -93,7 +105,7 @@ func (m *Multiplexer) OpenStream() uint16 { //nolint:revive } } -func (m *Multiplexer) SendData(sid uint16, data []byte) error { //nolint:revive +func (m *Multiplexer) SendData(sid uint16, data []byte) error { m.mu.RLock() stream, exists := m.streams[sid] m.mu.RUnlock() @@ -122,12 +134,12 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error { //nolint:revive m.sendSeq[sid]++ m.sendSeqMu.Unlock() - frame := make([]byte, 12+len(chunk)) + frame := make([]byte, HeaderSize+len(chunk)) binary.BigEndian.PutUint32(frame[0:4], m.clientID) binary.BigEndian.PutUint16(frame[4:6], sid) - binary.BigEndian.PutUint16(frame[6:8], uint16(uint32(len(chunk)))) //nolint:gosec + binary.BigEndian.PutUint16(frame[6:8], uint16(len(chunk))) binary.BigEndian.PutUint32(frame[8:12], seq) - copy(frame[12:], chunk) + copy(frame[HeaderSize:], chunk) if err := m.onSend(frame); err != nil { return fmt.Errorf("onSend failed: %w", err) @@ -137,7 +149,7 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error { //nolint:revive return nil } -func (m *Multiplexer) CloseStream(sid uint16) error { //nolint:revive +func (m *Multiplexer) CloseStream(sid uint16) error { m.mu.Lock() defer m.mu.Unlock() @@ -149,7 +161,10 @@ func (m *Multiplexer) CloseStream(sid uint16) error { //nolint:revive delete(m.sendSeq, sid) m.sendSeqMu.Unlock() - frame := make([]byte, 12) + // Notify anyone waiting for buffer space that a stream is closed + m.bufferCond.Broadcast() + + frame := make([]byte, HeaderSize) binary.BigEndian.PutUint32(frame[0:4], m.clientID) binary.BigEndian.PutUint16(frame[4:6], sid) binary.BigEndian.PutUint16(frame[6:8], 0) @@ -161,7 +176,7 @@ func (m *Multiplexer) CloseStream(sid uint16) error { //nolint:revive return nil } -func (m *Multiplexer) SendClientReset() error { //nolint:revive +func (m *Multiplexer) SendClientReset() error { if m.clientID == 0 { return ErrClientResetID } @@ -171,23 +186,23 @@ func (m *Multiplexer) SendClientReset() error { //nolint:revive return nil } -func BuildControlFrame(clientID uint32, controlType uint32) []byte { //nolint:revive - frame := make([]byte, 12) +func BuildControlFrame(clientID uint32, controlType uint32) []byte { + frame := make([]byte, HeaderSize) binary.BigEndian.PutUint32(frame[0:4], clientID) binary.BigEndian.PutUint16(frame[4:6], ControlStreamID) - binary.BigEndian.PutUint16(frame[6:8], ControlLength) + binary.BigEndian.PutUint16(frame[6:8], 0xFFFF) // Use 0xFFFF as a marker for control binary.BigEndian.PutUint32(frame[8:12], controlType) return frame } -func ParseControlFrame(frame []byte) (ControlFrame, bool) { //nolint:revive - if len(frame) < 12 { +func ParseControlFrame(frame []byte) (ControlFrame, bool) { + if len(frame) < HeaderSize { return ControlFrame{}, false } sid := binary.BigEndian.Uint16(frame[4:6]) length := binary.BigEndian.Uint16(frame[6:8]) - if sid != ControlStreamID || length != ControlLength { + if sid != ControlStreamID || length != 0xFFFF { return ControlFrame{}, false } @@ -197,14 +212,14 @@ func ParseControlFrame(frame []byte) (ControlFrame, bool) { //nolint:revive }, true } -func (m *Multiplexer) HandleFrame(frame []byte) { //nolint:revive +func (m *Multiplexer) HandleFrame(frame []byte) { control, ok := ParseControlFrame(frame) if ok { m.handleControlFrame(control) return } - if len(frame) < 12 { + if len(frame) < HeaderSize { return } @@ -218,11 +233,11 @@ func (m *Multiplexer) HandleFrame(frame []byte) { //nolint:revive return } - if len(frame) < 12+int(length) { + if len(frame) < HeaderSize+int(length) { return } - m.processDataFrame(sid, clientID, seq, frame[12:12+length]) + m.processDataFrame(sid, clientID, seq, frame[HeaderSize:HeaderSize+int(length)]) } func (m *Multiplexer) handleCloseStreamFrame(sid uint16, clientID uint32) { @@ -230,6 +245,7 @@ func (m *Multiplexer) handleCloseStreamFrame(sid uint16, clientID uint32) { defer m.mu.Unlock() if stream, exists := m.streams[sid]; exists && stream.ClientID == clientID { stream.closed = true + m.bufferCond.Broadcast() } } @@ -279,6 +295,7 @@ func (m *Multiplexer) getOrCreateStream(sid uint16, clientID uint32) *Stream { stream.closed = false stream.nextSeq = 0 stream.outOfOrder = make(map[uint32][]byte) + m.bufferCond.Broadcast() } return stream } @@ -319,7 +336,7 @@ func (m *Multiplexer) handleControlFrame(control ControlFrame) { } } -func (m *Multiplexer) ResetClient(clientID uint32) { //nolint:revive +func (m *Multiplexer) ResetClient(clientID uint32) { m.mu.Lock() defer m.mu.Unlock() @@ -329,6 +346,7 @@ func (m *Multiplexer) ResetClient(clientID uint32) { //nolint:revive delete(m.streams, streamSid) } } + m.bufferCond.Broadcast() } func (m *Multiplexer) waitForBufferSpace(sid uint16, clientID uint32, need int) *Stream { @@ -340,13 +358,12 @@ func (m *Multiplexer) waitForBufferSpace(sid uint16, clientID uint32, need int) if len(stream.recvBuf)+need <= m.maxBufferSize { return stream } - m.mu.Unlock() - time.Sleep(5 * time.Millisecond) - m.mu.Lock() + // Wait for space to become available + m.bufferCond.Wait() } } -func (m *Multiplexer) ReadStream(sid uint16) []byte { //nolint:revive +func (m *Multiplexer) ReadStream(sid uint16) []byte { m.mu.Lock() defer m.mu.Unlock() @@ -357,10 +374,14 @@ func (m *Multiplexer) ReadStream(sid uint16) []byte { //nolint:revive data := stream.recvBuf stream.recvBuf = make([]byte, 0) + + // Notify producers that space is now available + m.bufferCond.Broadcast() + return data } -func (m *Multiplexer) StreamClosed(sid uint16) bool { //nolint:revive +func (m *Multiplexer) StreamClosed(sid uint16) bool { m.mu.RLock() defer m.mu.RUnlock() @@ -368,7 +389,7 @@ func (m *Multiplexer) StreamClosed(sid uint16) bool { //nolint:revive return !exists || stream.closed } -func (m *Multiplexer) GetStreams() []uint16 { //nolint:revive +func (m *Multiplexer) GetStreams() []uint16 { m.mu.RLock() defer m.mu.RUnlock() @@ -379,13 +400,13 @@ func (m *Multiplexer) GetStreams() []uint16 { //nolint:revive return sids } -func (m *Multiplexer) GetStream(sid uint16) *Stream { //nolint:revive +func (m *Multiplexer) GetStream(sid uint16) *Stream { m.mu.RLock() defer m.mu.RUnlock() return m.streams[sid] } -func (m *Multiplexer) Reset() { //nolint:revive +func (m *Multiplexer) Reset() { m.mu.Lock() defer m.mu.Unlock() @@ -399,16 +420,18 @@ func (m *Multiplexer) Reset() { //nolint:revive m.sendSeqMu.Lock() m.sendSeq = make(map[uint16]uint32) m.sendSeqMu.Unlock() + + m.bufferCond.Broadcast() } -func (m *Multiplexer) UpdateSendFunc(onSend func([]byte) error) { //nolint:revive +func (m *Multiplexer) UpdateSendFunc(onSend func([]byte) error) { m.mu.Lock() defer m.mu.Unlock() m.onSend = onSend } -func (m *Multiplexer) WaitForData(sid uint16) <-chan struct{} { //nolint:revive +func (m *Multiplexer) WaitForData(sid uint16) <-chan struct{} { m.dataReadyMu.Lock() defer m.dataReadyMu.Unlock() @@ -418,7 +441,7 @@ func (m *Multiplexer) WaitForData(sid uint16) <-chan struct{} { //nolint:revive return m.dataReady[sid] } -func (m *Multiplexer) CleanupDataChannel(sid uint16) { //nolint:revive +func (m *Multiplexer) CleanupDataChannel(sid uint16) { m.dataReadyMu.Lock() defer m.dataReadyMu.Unlock() From 41b51e275e4aab45d46cbf9e8909d7db6c2c153a Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 20 Apr 2026 05:23:07 +0300 Subject: [PATCH 02/10] refactor(logger,crypto): standardize logging and clean up crypto --- internal/crypto/chacha.go | 29 ++++++++++++++---------- internal/logger/logger.go | 47 ++++++++++++++++++++++++++++++++++----- 2 files changed, 58 insertions(+), 18 deletions(-) diff --git a/internal/crypto/chacha.go b/internal/crypto/chacha.go index 8e7f268..f6ac390 100644 --- a/internal/crypto/chacha.go +++ b/internal/crypto/chacha.go @@ -11,15 +11,17 @@ import ( ) var ( - ErrInvalidKeySize = errors.New("invalid key size") //nolint:revive - ErrCiphertextTooShort = errors.New("ciphertext too short") //nolint:revive + ErrInvalidKeySize = errors.New("invalid key size") + ErrCiphertextTooShort = errors.New("ciphertext too short") ) -type Cipher struct { //nolint:revive +// Cipher provides AEAD encryption and decryption using ChaCha20-Poly1305. +type Cipher struct { aead cipher.AEAD } -func NewCipher(keyStr string) (*Cipher, error) { //nolint:revive +// NewCipher creates a new Cipher instance with the given 32-byte key. +func NewCipher(keyStr string) (*Cipher, error) { key := []byte(keyStr) if len(key) != chacha20poly1305.KeySize { return nil, ErrInvalidKeySize @@ -33,23 +35,26 @@ func NewCipher(keyStr string) (*Cipher, error) { //nolint:revive return &Cipher{aead: aead}, nil } -func (c *Cipher) Encrypt(plaintext []byte) ([]byte, error) { //nolint:revive +// Encrypt encrypts plaintext and prepends a random nonce. +func (c *Cipher) Encrypt(plaintext []byte) ([]byte, error) { nonce := make([]byte, c.aead.NonceSize()) if _, err := rand.Read(nonce); err != nil { - return nil, fmt.Errorf("failed to read nonce: %w", err) + return nil, fmt.Errorf("failed to generate nonce: %w", err) } - ciphertext := c.aead.Seal(nonce, nonce, plaintext, nil) - return ciphertext, nil + // Seal appends the ciphertext to the nonce + return c.aead.Seal(nonce, nonce, plaintext, nil), nil } -func (c *Cipher) Decrypt(ciphertext []byte) ([]byte, error) { //nolint:revive - if len(ciphertext) < c.aead.NonceSize() { +// Decrypt decrypts ciphertext that has a nonce prepended. +func (c *Cipher) Decrypt(ciphertext []byte) ([]byte, error) { + nonceSize := c.aead.NonceSize() + if len(ciphertext) < nonceSize { return nil, ErrCiphertextTooShort } - nonce := ciphertext[:c.aead.NonceSize()] - encrypted := ciphertext[c.aead.NonceSize():] + nonce := ciphertext[:nonceSize] + encrypted := ciphertext[nonceSize:] res, err := c.aead.Open(nil, nonce, encrypted, nil) if err != nil { diff --git a/internal/logger/logger.go b/internal/logger/logger.go index fbaa63f..6ec62bd 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -1,27 +1,62 @@ -package logger //nolint:revive +package logger import ( + "fmt" "log" "sync/atomic" ) -var verboseEnabled atomic.Bool //nolint:gochecknoglobals +var verboseEnabled atomic.Bool -func SetVerbose(enabled bool) { //nolint:revive +// SetVerbose enables or disables verbose/debug logging. +func SetVerbose(enabled bool) { verboseEnabled.Store(enabled) } -func IsVerbose() bool { //nolint:revive +// IsVerbose returns true if verbose logging is enabled. +func IsVerbose() bool { return verboseEnabled.Load() } -func Verbosef(format string, v ...interface{}) { //nolint:revive +// Info logs an informational message. +func Info(v ...any) { + log.Print("[INFO] ", fmt.Sprint(v...)) +} + +// Infof logs a formatted informational message. +func Infof(format string, v ...any) { + log.Printf("[INFO] "+format, v...) +} + +// Warn logs a warning message. +func Warn(v ...any) { + log.Print("[WARN] ", fmt.Sprint(v...)) +} + +// Warnf logs a formatted warning message. +func Warnf(format string, v ...any) { + log.Printf("[WARN] "+format, v...) +} + +// Error logs an error message. +func Error(v ...any) { + log.Print("[ERROR] ", fmt.Sprint(v...)) +} + +// Errorf logs a formatted error message. +func Errorf(format string, v ...any) { + log.Printf("[ERROR] "+format, v...) +} + +// Verbosef logs a formatted message if verbose logging is enabled. +func Verbosef(format string, v ...any) { if verboseEnabled.Load() { log.Printf("[VERBOSE] "+format, v...) } } -func Debugf(format string, v ...interface{}) { //nolint:revive +// Debugf logs a formatted message if verbose logging is enabled. +func Debugf(format string, v ...any) { if verboseEnabled.Load() { log.Printf("[DEBUG] "+format, v...) } From fd40ec8320c99c752d1bbdb79831dfac0bb01519 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 20 Apr 2026 05:24:30 +0300 Subject: [PATCH 03/10] refactor(provider,logger): clean logging and unify provider interface --- internal/logger/logger.go | 17 ++++++++--------- internal/provider/errors.go | 17 ----------------- internal/provider/provider.go | 14 +++++++++++++- 3 files changed, 21 insertions(+), 27 deletions(-) delete mode 100644 internal/provider/errors.go diff --git a/internal/logger/logger.go b/internal/logger/logger.go index 6ec62bd..c402413 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -1,7 +1,6 @@ package logger import ( - "fmt" "log" "sync/atomic" ) @@ -20,44 +19,44 @@ func IsVerbose() bool { // Info logs an informational message. func Info(v ...any) { - log.Print("[INFO] ", fmt.Sprint(v...)) + log.Print(v...) } // Infof logs a formatted informational message. func Infof(format string, v ...any) { - log.Printf("[INFO] "+format, v...) + log.Printf(format, v...) } // Warn logs a warning message. func Warn(v ...any) { - log.Print("[WARN] ", fmt.Sprint(v...)) + log.Print(v...) } // Warnf logs a formatted warning message. func Warnf(format string, v ...any) { - log.Printf("[WARN] "+format, v...) + log.Printf(format, v...) } // Error logs an error message. func Error(v ...any) { - log.Print("[ERROR] ", fmt.Sprint(v...)) + log.Print(v...) } // Errorf logs a formatted error message. func Errorf(format string, v ...any) { - log.Printf("[ERROR] "+format, v...) + log.Printf(format, v...) } // Verbosef logs a formatted message if verbose logging is enabled. func Verbosef(format string, v ...any) { if verboseEnabled.Load() { - log.Printf("[VERBOSE] "+format, v...) + log.Printf(format, v...) } } // Debugf logs a formatted message if verbose logging is enabled. func Debugf(format string, v ...any) { if verboseEnabled.Load() { - log.Printf("[DEBUG] "+format, v...) + log.Printf(format, v...) } } diff --git a/internal/provider/errors.go b/internal/provider/errors.go deleted file mode 100644 index c6c0ccb..0000000 --- a/internal/provider/errors.go +++ /dev/null @@ -1,17 +0,0 @@ -// Package provider defines common errors for WebRTC providers. -package provider - -import "errors" - -var ( - // ErrProviderNotFound is returned when the requested provider is not registered. - ErrProviderNotFound = errors.New("provider not found") - // ErrDataChannelTimeout is returned when the DataChannel fails to open in time. - ErrDataChannelTimeout = errors.New("datachannel timeout") - // ErrDataChannelNotReady is returned when attempting to send data before the DataChannel is open. - ErrDataChannelNotReady = errors.New("datachannel not ready") - // ErrSendQueueClosed is returned when attempting to send data after the send queue has been closed. - ErrSendQueueClosed = errors.New("send queue closed") - // ErrSendQueueTimeout is returned when the send queue is full and the timeout is reached. - ErrSendQueueTimeout = errors.New("send queue timeout") -) diff --git a/internal/provider/provider.go b/internal/provider/provider.go index e05f679..26cf4fc 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -3,10 +3,19 @@ package provider import ( "context" + "errors" "github.com/pion/webrtc/v4" ) +var ( + ErrProviderNotFound = errors.New("provider not found") + ErrDataChannelTimeout = errors.New("datachannel timeout") + ErrDataChannelNotReady = errors.New("datachannel not ready") + ErrSendQueueClosed = errors.New("send queue closed") + ErrSendQueueTimeout = errors.New("send queue timeout") +) + // Provider defines the standard interface for WebRTC connection handlers. type Provider interface { Connect(ctx context.Context) error @@ -19,6 +28,9 @@ type Provider interface { CanSend() bool GetSendQueue() chan []byte GetBufferedAmount() uint64 + + // AddVideoTrack adds a video track to the connection. + AddVideoTrack(track *webrtc.TrackLocalStaticRTP) (*webrtc.RTPSender, error) } // Config holds common configuration for all providers. @@ -34,7 +46,7 @@ type Config struct { // Factory is a function that creates a new Provider instance. type Factory func(ctx context.Context, cfg Config) (Provider, error) -//nolint:gochecknoglobals +// registry holds all registered provider factories. var registry = make(map[string]Factory) // Register adds a new provider factory to the registry. From 686909d8a224c4644bb96306001609243ccf219d Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 20 Apr 2026 05:26:50 +0300 Subject: [PATCH 04/10] refactor(provider): implement AddVideoTrack in all providers --- internal/provider/jazz/peer.go | 8 +++++++ internal/provider/jazz/provider.go | 5 +++++ internal/provider/telemost/peer.go | 8 +++++++ internal/provider/telemost/provider.go | 8 ++++++- internal/provider/wbstream/peer.go | 8 +++++++ internal/provider/wbstream/provider.go | 30 +++++++++++++++----------- 6 files changed, 54 insertions(+), 13 deletions(-) diff --git a/internal/provider/jazz/peer.go b/internal/provider/jazz/peer.go index 156d9ce..34428a2 100644 --- a/internal/provider/jazz/peer.go +++ b/internal/provider/jazz/peer.go @@ -533,6 +533,14 @@ func (p *Peer) Close() error { return nil } +// AddVideoTrack adds a video track to the publisher peer connection. +func (p *Peer) AddVideoTrack(track *webrtc.TrackLocalStaticRTP) (*webrtc.RTPSender, error) { + if p.pcPub == nil { + return nil, fmt.Errorf("publisher peer connection not initialized") + } + return p.pcPub.AddTrack(track) +} + // SetReconnectCallback sets the callback for reconnection events. func (p *Peer) SetReconnectCallback(cb func(*webrtc.DataChannel)) { p.onReconnect = cb diff --git a/internal/provider/jazz/provider.go b/internal/provider/jazz/provider.go index d77d5f3..23a5715 100644 --- a/internal/provider/jazz/provider.go +++ b/internal/provider/jazz/provider.go @@ -72,3 +72,8 @@ func (j *jazzProvider) GetSendQueue() chan []byte { func (j *jazzProvider) GetBufferedAmount() uint64 { return j.peer.GetBufferedAmount() } + +// AddVideoTrack adds a video track to the jazz connection. +func (j *jazzProvider) AddVideoTrack(track *webrtc.TrackLocalStaticRTP) (*webrtc.RTPSender, error) { + return j.peer.AddVideoTrack(track) +} diff --git a/internal/provider/telemost/peer.go b/internal/provider/telemost/peer.go index e2fe93b..30e3737 100644 --- a/internal/provider/telemost/peer.go +++ b/internal/provider/telemost/peer.go @@ -1160,3 +1160,11 @@ func (p *Peer) CanSend() bool { } return len(p.sendQueue) < 4000 } + +// AddVideoTrack adds a video track to the publisher peer connection. +func (p *Peer) AddVideoTrack(track *webrtc.TrackLocalStaticRTP) (*webrtc.RTPSender, error) { + if p.pcPub == nil { + return nil, fmt.Errorf("publisher peer connection not initialized") + } + return p.pcPub.AddTrack(track) +} diff --git a/internal/provider/telemost/provider.go b/internal/provider/telemost/provider.go index ecc8c4f..90272f9 100644 --- a/internal/provider/telemost/provider.go +++ b/internal/provider/telemost/provider.go @@ -13,7 +13,7 @@ type telemostProvider struct { peer *Peer } -// New creates a new Yandex Telemost provider instance. +// New creates a new Telemost provider instance. func New(ctx context.Context, cfg provider.Config) (provider.Provider, error) { peer, err := NewPeer(ctx, cfg.RoomURL, cfg.Name, cfg.OnData) if err != nil { @@ -72,3 +72,9 @@ func (t *telemostProvider) GetSendQueue() chan []byte { func (t *telemostProvider) GetBufferedAmount() uint64 { return t.peer.GetBufferedAmount() } + +// AddVideoTrack adds a video track to the telemost connection. +func (t *telemostProvider) AddVideoTrack(track *webrtc.TrackLocalStaticRTP) (*webrtc.RTPSender, error) { + return t.peer.AddVideoTrack(track) +} + diff --git a/internal/provider/wbstream/peer.go b/internal/provider/wbstream/peer.go index 499a8d6..8d6652a 100644 --- a/internal/provider/wbstream/peer.go +++ b/internal/provider/wbstream/peer.go @@ -193,3 +193,11 @@ func (p *Peer) GetSendQueue() chan []byte { func (p *Peer) GetBufferedAmount() uint64 { return 0 } + +// AddVideoTrack adds a video track to the publisher peer connection. +func (p *Peer) AddVideoTrack(track *webrtc.TrackLocalStaticRTP) (*webrtc.RTPSender, error) { + if p.pcPub == nil { + return nil, fmt.Errorf("publisher peer connection not initialized") + } + return p.pcPub.AddTrack(track) +} diff --git a/internal/provider/wbstream/provider.go b/internal/provider/wbstream/provider.go index db0c609..79fec22 100644 --- a/internal/provider/wbstream/provider.go +++ b/internal/provider/wbstream/provider.go @@ -9,7 +9,7 @@ import ( "github.com/pion/webrtc/v4" ) -type wbstreamProvider struct { +type wbStreamProvider struct { peer *Peer } @@ -20,55 +20,61 @@ func New(ctx context.Context, cfg provider.Config) (provider.Provider, error) { return nil, fmt.Errorf("create wbstream peer: %w", err) } - return &wbstreamProvider{peer: peer}, nil + return &wbStreamProvider{peer: peer}, nil } // Connect starts the provider connection. -func (w *wbstreamProvider) Connect(ctx context.Context) error { +func (w *wbStreamProvider) Connect(ctx context.Context) error { return w.peer.Connect(ctx) } // Send transmits data to the room. -func (w *wbstreamProvider) Send(data []byte) error { +func (w *wbStreamProvider) Send(data []byte) error { return w.peer.Send(data) } // Close terminates the provider connection. -func (w *wbstreamProvider) Close() error { +func (w *wbStreamProvider) Close() error { return w.peer.Close() } // SetReconnectCallback sets the function to call on reconnection. -func (w *wbstreamProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) { +func (w *wbStreamProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) { w.peer.SetReconnectCallback(cb) } // SetShouldReconnect sets the function to determine if reconnection should occur. -func (w *wbstreamProvider) SetShouldReconnect(fn func() bool) { +func (w *wbStreamProvider) SetShouldReconnect(fn func() bool) { w.peer.SetShouldReconnect(fn) } // SetEndedCallback sets the function to call when the session ends. -func (w *wbstreamProvider) SetEndedCallback(cb func(string)) { +func (w *wbStreamProvider) SetEndedCallback(cb func(string)) { w.peer.SetEndedCallback(cb) } // WatchConnection monitors the provider connection state. -func (w *wbstreamProvider) WatchConnection(ctx context.Context) { +func (w *wbStreamProvider) WatchConnection(ctx context.Context) { w.peer.WatchConnection(ctx) } // CanSend checks if the provider is ready to transmit data. -func (w *wbstreamProvider) CanSend() bool { +func (w *wbStreamProvider) CanSend() bool { return w.peer.CanSend() } // GetSendQueue returns the data transmission queue. -func (w *wbstreamProvider) GetSendQueue() chan []byte { +func (w *wbStreamProvider) GetSendQueue() chan []byte { return w.peer.GetSendQueue() } // GetBufferedAmount returns the current WebRTC buffered amount. -func (w *wbstreamProvider) GetBufferedAmount() uint64 { +func (w *wbStreamProvider) GetBufferedAmount() uint64 { return w.peer.GetBufferedAmount() } + +// AddVideoTrack adds a video track to the wbstream connection. +func (w *wbStreamProvider) AddVideoTrack(track *webrtc.TrackLocalStaticRTP) (*webrtc.RTPSender, error) { + return w.peer.AddVideoTrack(track) +} + From a0b6ef0f35ae9592f523aa7d59d01c71f8e3b4a9 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 20 Apr 2026 05:27:35 +0300 Subject: [PATCH 05/10] refactor(server): replace log.Printf with logger and clean up --- internal/server/server.go | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/internal/server/server.go b/internal/server/server.go index 1336203..a725393 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -205,7 +205,7 @@ func (s *Server) addPeer( } peer.SetEndedCallback(func(reason string) { - log.Printf("Server peer %d reported conference end: %s", peerID, reason) + logger.Infof("Server peer %d reported conference end: %s", peerID, reason) cancel() }) s.peers = append(s.peers, peer) @@ -214,11 +214,11 @@ func (s *Server) addPeer( s.handlePeerReconnect(peerID, dc) }) - log.Printf("Connecting peer %d to %s...", peerID, providerName) + logger.Infof("Connecting peer %d to %s...", peerID, providerName) if err := peer.Connect(ctx); err != nil { return fmt.Errorf("failed to connect peer: %w", err) } - log.Printf("Peer %d connected", peerID) + logger.Infof("Peer %d connected", peerID) s.wg.Add(1) go func() { @@ -229,7 +229,7 @@ func (s *Server) addPeer( } func (s *Server) handlePeerReconnect(peerID int, dc *webrtc.DataChannel) { - log.Printf("peer %d reconnect event: dc=%v", peerID, dc != nil) + logger.Infof("peer %d reconnect event: dc=%v", peerID, dc != nil) s.connMu.Lock() for sid, conn := range s.connections { @@ -303,7 +303,7 @@ func (s *Server) onData(data []byte) { } if control, ok := mux.ParseControlFrame(plaintext); ok && control.Type == mux.ControlResetClient { - log.Printf("Received reset signal from client (clientID=%d)", control.ClientID) + logger.Infof("Received reset signal from client (clientID=%d)", control.ClientID) s.closeClientConnections(control.ClientID) } @@ -350,7 +350,7 @@ func (s *Server) shutdown() { s.connMu.Unlock() for i, peer := range s.peers { - log.Printf("closing peer %d", i) + logger.Infof("closing peer %d", i) _ = peer.Close() } } @@ -374,7 +374,7 @@ func (s *Server) processMuxStreams(ctx context.Context) { var req ConnectRequest if err := json.Unmarshal(data, &req); err == nil && req.Cmd == "connect" { - log.Printf("sid=%d connect %s:%d", sid, req.Addr, req.Port) + logger.Infof("sid=%d connect %s:%d", sid, req.Addr, req.Port) s.closeStreamConnection(sid) go s.handleConnect(ctx, sid, req) } @@ -437,7 +437,7 @@ func (s *Server) handleConnect(ctx context.Context, sid uint16, req ConnectReque dialElapsed := time.Since(dialStart) if err != nil { - log.Printf("sid=%d dial %s failed (%v): %v", sid, addr, dialElapsed, err) + logger.Infof("sid=%d dial %s failed (%v): %v", sid, addr, dialElapsed, err) _ = s.mux.CloseStream(sid) return } @@ -446,7 +446,7 @@ func (s *Server) handleConnect(ctx context.Context, sid uint16, req ConnectReque s.connections[sid] = conn s.connMu.Unlock() - log.Printf("sid=%d connected %s in %v", sid, addr, dialElapsed) + logger.Infof("sid=%d connected %s in %v", sid, addr, dialElapsed) s.activeClients.Add(1) _ = s.mux.SendData(sid, []byte{0x00}) @@ -504,7 +504,7 @@ func (s *Server) pumpToMux(sid uint16, conn net.Conn) { n, err := conn.Read(buf) if err != nil { if totalSent > 1024*1024 { - log.Printf("sid=%d done total=%dMB", sid, totalSent/(1024*1024)) + logger.Infof("sid=%d done total=%dMB", sid, totalSent/(1024*1024)) } return } @@ -519,7 +519,7 @@ func (s *Server) pumpToMux(sid uint16, conn net.Conn) { totalSent += uint64(n) //nolint:gosec if time.Since(lastLog) > 5*time.Second { - log.Printf("sid=%d sent=%dMB", sid, totalSent/(1024*1024)) + logger.Infof("sid=%d sent=%dMB", sid, totalSent/(1024*1024)) lastLog = time.Now() } } From d1d82ff6a3bd4dfdd919570cb5a69bc4b34c0261 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 20 Apr 2026 05:29:27 +0300 Subject: [PATCH 06/10] refactor(client): replace log.Printf with logger and standardize --- internal/client/client.go | 787 ++++++++++++++++---------------------- 1 file changed, 332 insertions(+), 455 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index 9712bea..7a23454 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -10,9 +10,7 @@ import ( "errors" "fmt" "io" - "log" "net" - "strconv" "sync" "sync/atomic" "time" @@ -26,232 +24,358 @@ import ( ) var ( - errInvalidKeyLength = errors.New("key must be 32 bytes") - errInvalidKeyStringLength = errors.New("key string length must be 32") - errNoConnectedPeers = errors.New("no connected peers available") + ErrKeySize = errors.New("key must be 32 bytes") + ErrKeyStringLength = errors.New("key string length must be 32") + ErrInvalidSocks5 = errors.New("invalid SOCKS5 version") + ErrNoPeers = errors.New("no peers available") + ErrEncryptFailed = errors.New("encrypt failed") ) -// Client handles local SOCKS5 connections and tunnels them through WebRTC. +// Client handles local SOCKS5 connections and tunnels them via WebRTC. type Client struct { - peers []provider.Provider - cipher *crypto.Cipher - mux *mux.Multiplexer - clientID uint32 - peerIdx atomic.Uint32 - wg sync.WaitGroup + peers []provider.Provider + cipher *crypto.Cipher + mux *mux.Multiplexer + connections map[uint16]net.Conn + connMu sync.RWMutex + peerIdx atomic.Uint32 + clientID uint32 + activeClients atomic.Int32 + wg sync.WaitGroup + dnsServer string } -const defaultSOCKSListenHost = "127.0.0.1" - // Run starts the client with the specified parameters. func Run( ctx context.Context, providerName, roomURL, keyHex string, - socksPort int, - socksHost, - socksUser, - socksPass string, -) error { - return RunWithReady(ctx, providerName, roomURL, keyHex, socksPort, socksHost, socksUser, socksPass, nil) -} - -// RunWithReady starts the client and calls onReady when it is listening. -func RunWithReady( - ctx context.Context, - providerName, - roomURL, - keyHex string, - socksPort int, - socksHost, - socksUser, - socksPass string, - onReady func(), + localAddr string, + dnsServer, + socksProxyAddr string, + socksProxyPort int, ) error { runCtx, cancel := context.WithCancel(ctx) defer cancel() - key, err := decodeKey(keyHex) + cipher, err := setupCipher(keyHex) if err != nil { - return fmt.Errorf("decodeKey failed: %w", err) + return fmt.Errorf("setupCipher failed: %w", err) } - keyStr := string(key) - if len(keyStr) != 32 { - return fmt.Errorf("%w: got %d", errInvalidKeyStringLength, len(keyStr)) - } - - cipher, err := crypto.NewCipher(keyStr) - if err != nil { - return fmt.Errorf("create cipher: %w", err) + clientIDBytes := make([]byte, 4) + if _, err := rand.Read(clientIDBytes); err != nil { + return fmt.Errorf("failed to generate client ID: %w", err) } + clientID := binary.BigEndian.Uint32(clientIDBytes) c := &Client{ - cipher: cipher, - clientID: uint32(time.Now().UnixNano() & 0xFFFFFFFF), - peers: make([]provider.Provider, 0, 1), + cipher: cipher, + connections: make(map[uint16]net.Conn), + peers: make([]provider.Provider, 0), + clientID: clientID, + dnsServer: dnsServer, } - c.mux = mux.New(c.clientID, c.sendFrame) + c.setupMux() - for peerID := range 1 { - if err := c.addPeer(runCtx, providerName, roomURL, peerID, cancel); err != nil { + const peerCount = 1 + for i := range peerCount { + if err := c.addPeer(runCtx, providerName, roomURL, i, cancel, dnsServer, socksProxyAddr, socksProxyPort); err != nil { return fmt.Errorf("addPeer failed: %w", err) } } - time.Sleep(100 * time.Millisecond) - c.sendResetSignal() + ln, err := net.Listen("tcp", localAddr) + if err != nil { + return fmt.Errorf("listen failed: %w", err) + } + defer ln.Close() - err = c.runSOCKS5(runCtx, socksHost, socksPort, socksUser, socksPass, onReady) + logger.Infof("SOCKS5 server listening on %s (ClientID: %d)", localAddr, clientID) + go c.acceptLoop(runCtx, ln) + + <-runCtx.Done() + c.shutdown() c.wg.Wait() - return err -} - -func decodeKey(keyHex string) ([]byte, error) { - if keyHex == "" { - key := make([]byte, 32) - if _, err := rand.Read(key); err != nil { - return nil, fmt.Errorf("generate random key: %w", err) - } - - log.Printf("Generated key: %x", key) - return key, nil - } - - key, err := hex.DecodeString(keyHex) - if err != nil { - return nil, fmt.Errorf("decode hex key: %w", err) - } - - if len(key) != 32 { - return nil, fmt.Errorf("%w: got %d", errInvalidKeyLength, len(key)) - } - - return key, nil -} - -func (c *Client) sendFrame(frame []byte) error { - waitUntilPeersCanSend(c.peers) - - encrypted, err := c.cipher.Encrypt(frame) - if err != nil { - return fmt.Errorf("encrypt outgoing frame: %w", err) - } - - peer, err := c.nextPeer() - if err != nil { - return err - } - - if err := peer.Send(encrypted); err != nil { - return fmt.Errorf("send frame via peer: %w", err) - } - return nil } -func waitUntilPeersCanSend(peers []provider.Provider) { - for { - canSend := true - for _, peer := range peers { - if !peer.CanSend() { - canSend = false - break - } - } - - if canSend { - return - } - - time.Sleep(10 * time.Millisecond) +func setupCipher(keyHex string) (*crypto.Cipher, error) { + key, err := hex.DecodeString(keyHex) + if err != nil { + return nil, fmt.Errorf("failed to decode key: %w", err) } + if len(key) != 32 { + return nil, ErrKeySize + } + + keyStr := string(key) + if len(keyStr) != 32 { + return nil, ErrKeyStringLength + } + + cipher, err := crypto.NewCipher(keyStr) + if err != nil { + return nil, fmt.Errorf("failed to create cipher: %w", err) + } + return cipher, nil } -// nextPeer returns the next provider for load balancing. -// -//nolint:ireturn -func (c *Client) nextPeer() (provider.Provider, error) { - switch len(c.peers) { - case 0: - return nil, errNoConnectedPeers - case 1: - return c.peers[0], nil - default: - return c.peers[int(c.peerIdx.Add(1)%2)], nil - } +func (c *Client) setupMux() { + c.mux = mux.New(c.clientID, func(frame []byte) error { + for { + canSend := true + for _, peer := range c.peers { + if !peer.CanSend() { + canSend = false + break + } + } + if canSend { + break + } + time.Sleep(10 * time.Millisecond) + } + + encrypted, err := c.cipher.Encrypt(frame) + if err != nil { + return fmt.Errorf("%w: %w", ErrEncryptFailed, err) + } + if len(c.peers) == 0 { + return ErrNoPeers + } + idx := c.peerIdx.Add(1) % uint32(len(c.peers)) //nolint:gosec + return c.peers[idx].Send(encrypted) + }) } func (c *Client) addPeer( - runCtx context.Context, + ctx context.Context, providerName, roomURL string, peerID int, cancel context.CancelFunc, + dnsServer, + socksProxyAddr string, + socksProxyPort int, ) error { - peer, err := provider.New(runCtx, providerName, provider.Config{ - RoomURL: roomURL, - Name: names.Generate(), - OnData: c.onData, + peer, err := provider.New(ctx, providerName, provider.Config{ + RoomURL: roomURL, + Name: names.Generate(), + OnData: c.onData, + DNSServer: dnsServer, + ProxyAddr: socksProxyAddr, + ProxyPort: socksProxyPort, }) if err != nil { - return fmt.Errorf("create peer %d: %w", peerID, err) + return fmt.Errorf("failed to create peer: %w", err) } peer.SetEndedCallback(func(reason string) { - log.Printf("Client peer %d reported conference end: %s", peerID, reason) + logger.Infof("Client peer %d reported conference end: %s", peerID, reason) cancel() }) - - peer.SetReconnectCallback(func(dc *webrtc.DataChannel) { - c.onReconnect(peerID, dc) - }) - c.peers = append(c.peers, peer) - log.Printf("Connecting peer %d to %s...", peerID, providerName) - if err := peer.Connect(runCtx); err != nil { - return fmt.Errorf("connect peer %d: %w", peerID, err) + peer.SetReconnectCallback(func(dc *webrtc.DataChannel) { + c.handlePeerReconnect(peerID, dc) + }) + + logger.Infof("Connecting peer %d to %s...", peerID, providerName) + if err := peer.Connect(ctx); err != nil { + return fmt.Errorf("failed to connect peer: %w", err) } - log.Printf("Peer %d connected", peerID) + logger.Infof("Peer %d connected", peerID) c.wg.Add(1) go func() { defer c.wg.Done() - peer.WatchConnection(runCtx) + peer.WatchConnection(ctx) }() + // Send initial reset to clean up any stale connections for this clientID on server + if err := c.mux.SendClientReset(); err != nil { + logger.Warnf("Failed to send initial client reset: %v", err) + } + return nil } -func (c *Client) onReconnect(peerID int, dc *webrtc.DataChannel) { - log.Printf("peer %d reconnect event: dc=%v", peerID, dc != nil) +func (c *Client) handlePeerReconnect(peerID int, dc *webrtc.DataChannel) { + logger.Infof("peer %d reconnect event: dc=%v", peerID, dc != nil) + + c.connMu.Lock() + for sid, conn := range c.connections { + if conn != nil { + _ = conn.Close() + } + delete(c.connections, sid) + } + c.connMu.Unlock() if dc != nil { - c.mux.UpdateSendFunc(c.sendFrame) + c.mux.UpdateSendFunc(func(frame []byte) error { + encrypted, err := c.cipher.Encrypt(frame) + if err != nil { + return fmt.Errorf("%w: %w", ErrEncryptFailed, err) + } + if len(c.peers) == 0 { + return ErrNoPeers + } + idx := c.peerIdx.Add(1) % uint32(len(c.peers)) //nolint:gosec + return c.peers[idx].Send(encrypted) + }) c.mux.Reset() + + if err := c.mux.SendClientReset(); err != nil { + logger.Warnf("Failed to send client reset after reconnect: %v", err) + } } } -func (c *Client) sendResetSignal() { - resetFrame := mux.BuildControlFrame(c.clientID, mux.ControlResetClient) - encrypted, err := c.cipher.Encrypt(resetFrame) - if err != nil { - log.Printf("Failed to encrypt reset signal: %v", err) +func (c *Client) acceptLoop(ctx context.Context, ln net.Listener) { + for { + select { + case <-ctx.Done(): + return + default: + conn, err := ln.Accept() + if err != nil { + logger.Debugf("Accept error: %v", err) + continue + } + go c.handleSOCKS5(ctx, conn) + } + } +} + +func (c *Client) handleSOCKS5(ctx context.Context, conn net.Conn) { + defer conn.Close() + + if err := c.socks5Handshake(conn); err != nil { + logger.Debugf("SOCKS5 handshake failed: %v", err) return } - for _, peer := range c.peers { - if err := peer.Send(encrypted); err != nil { - log.Printf("Failed to send reset signal to server: %v", err) - } + addr, port, err := c.socks5Request(conn) + if err != nil { + logger.Debugf("SOCKS5 request failed: %v", err) + return } - log.Printf("Sent reset signal to server (clientID=%d)", c.clientID) + sid := c.mux.OpenStream() + c.connMu.Lock() + c.connections[sid] = conn + c.connMu.Unlock() + + logger.Infof("sid=%d tunnel to %s:%d", sid, addr, port) + + req := map[string]any{ + "cmd": "connect", + "addr": addr, + "port": port, + } + reqData, _ := json.Marshal(req) + + if err := c.mux.SendData(sid, reqData); err != nil { + logger.Warnf("sid=%d send connect failed: %v", sid, err) + return + } + + dataReady := c.mux.WaitForData(sid) + select { + case <-dataReady: + resp := c.mux.ReadStream(sid) + if len(resp) > 0 && resp[0] == 0x00 { + if _, err := conn.Write(replySuccess()); err != nil { + return + } + } else { + _, _ = conn.Write(replyHostUnreachable()) + return + } + case <-time.After(15 * time.Second): + _, _ = conn.Write(replyHostUnreachable()) + c.mux.CleanupDataChannel(sid) + return + case <-ctx.Done(): + return + } + c.mux.CleanupDataChannel(sid) + + c.activeClients.Add(1) + c.startStreamPump(ctx, sid, conn) + c.pumpToMux(sid, conn) +} + +func (c *Client) socks5Handshake(conn net.Conn) error { + buf := make([]byte, 2) + if _, err := io.ReadFull(conn, buf); err != nil { + return err + } + + if buf[0] != 5 { + return ErrInvalidSocks5 + } + + methods := make([]byte, int(buf[1])) + if _, err := io.ReadFull(conn, methods); err != nil { + return err + } + + _, err := conn.Write([]byte{5, 0}) + return err +} + +func (c *Client) socks5Request(conn net.Conn) (string, int, error) { + buf := make([]byte, 4) + if _, err := io.ReadFull(conn, buf); err != nil { + return "", 0, err + } + + if buf[0] != 5 || buf[1] != 1 { + return "", 0, fmt.Errorf("unsupported SOCKS5 command: %d", buf[1]) + } + + var addr string + switch buf[3] { + case 1: // IPv4 + ip := make([]byte, 4) + if _, err := io.ReadFull(conn, ip); err != nil { + return "", 0, err + } + addr = net.IP(ip).String() + case 3: // Domain + lenBuf := make([]byte, 1) + if _, err := io.ReadFull(conn, lenBuf); err != nil { + return "", 0, err + } + domain := make([]byte, int(lenBuf[0])) + if _, err := io.ReadFull(conn, domain); err != nil { + return "", 0, err + } + addr = string(domain) + case 4: // IPv6 + ip := make([]byte, 16) + if _, err := io.ReadFull(conn, ip); err != nil { + return "", 0, err + } + addr = net.IP(ip).String() + default: + return "", 0, fmt.Errorf("unsupported address type: %d", buf[3]) + } + + portBuf := make([]byte, 2) + if _, err := io.ReadFull(conn, portBuf); err != nil { + return "", 0, err + } + port := int(binary.BigEndian.Uint16(portBuf)) + + return addr, port, nil } func (c *Client) onData(data []byte) { @@ -264,347 +388,100 @@ func (c *Client) onData(data []byte) { c.mux.HandleFrame(plaintext) } -func (c *Client) runSOCKS5( - ctx context.Context, - host string, - port int, - username, - password string, - onReady func(), -) error { - if host == "" { - host = defaultSOCKSListenHost - } - - listenAddr := net.JoinHostPort(host, strconv.Itoa(port)) - var lc net.ListenConfig - listener, err := lc.Listen(ctx, "tcp", listenAddr) - if err != nil { - return fmt.Errorf("listen on %s: %w", listenAddr, err) - } - - log.Printf("SOCKS5 proxy listening on %s (auth=%v)", listenAddr, username != "") - if onReady != nil { - onReady() - } - - go func() { - <-ctx.Done() - if err := listener.Close(); err != nil { - logger.Debugf("SOCKS5 listener close error: %v", err) +func (c *Client) shutdown() { + c.connMu.Lock() + for _, conn := range c.connections { + if conn != nil { + _ = conn.Close() } + } + c.connMu.Unlock() + + for i, peer := range c.peers { + logger.Infof("closing peer %d", i) + _ = peer.Close() + } +} + +func (c *Client) pumpToMux(sid uint16, conn net.Conn) { + defer func() { + c.activeClients.Add(-1) + _ = c.mux.CloseStream(sid) + c.connMu.Lock() + delete(c.connections, sid) + c.connMu.Unlock() }() + buf := make([]byte, 16384) + totalSent := uint64(0) + lastLog := time.Now() + for { - conn, err := listener.Accept() + n, err := conn.Read(buf) if err != nil { - select { - case <-ctx.Done(): - c.closePeers() - return nil - default: - log.Printf("accept error: %v", err) - continue + if totalSent > 1024*1024 { + logger.Infof("sid=%d done total=%dMB", sid, totalSent/(1024*1024)) } + return } - go c.handleSOCKS5(conn, username, password) - } -} + for !c.canSendData() { + time.Sleep(20 * time.Millisecond) + } -func (c *Client) closePeers() { - for _, peer := range c.peers { - if err := peer.Close(); err != nil { - logger.Debugf("Peer close error: %v", err) + if err := c.mux.SendData(sid, buf[:n]); err != nil { + return + } + + totalSent += uint64(n) //nolint:gosec + if time.Since(lastLog) > 5*time.Second { + logger.Infof("sid=%d sent=%dMB", sid, totalSent/(1024*1024)) + lastLog = time.Now() } } } -//nolint:cyclop // SOCKS5 parsing is inherently stateful and mirrors the protocol handshake. -func (c *Client) handleSOCKS5(conn net.Conn, username, password string) { - defer func() { - if err := conn.Close(); err != nil { - logger.Debugf("SOCKS5 connection close error: %v", err) - } - }() - - buf := make([]byte, 513) - if !readSOCKSVersionAndMethods(conn, buf) { - return - } - - nmethods := buf[1] - if _, err := io.ReadFull(conn, buf[:nmethods]); err != nil { - return - } - - requireAuth := username != "" - wantMethod := byte(0x00) - if requireAuth { - wantMethod = 0x02 - } - - if !supportsMethod(buf[:nmethods], wantMethod) { - writeResponse(conn, replyUnsupportedSOCKSMethod()) - return - } - writeResponse(conn, []byte{5, wantMethod}) - - if requireAuth && !authenticateSOCKSUser(conn, buf, username, password) { - return - } - - addr, port, ok := readConnectTarget(conn, buf) - if !ok { - return - } - - sid := c.mux.OpenStream() - logger.Verbosef("SOCKS5 opened stream sid=%d for %s:%d", sid, addr, port) - log.Printf("sid=%d socks5 %s:%d", sid, addr, port) - - if !c.sendConnectRequest(sid, addr, port) { - return - } - - if !c.waitConnectResponse(conn, sid) { - return - } - - c.mux.ReadStream(sid) - writeResponse(conn, replySuccess()) - c.proxyStream(conn, sid) -} - -func readSOCKSVersionAndMethods(conn net.Conn, buf []byte) bool { - if _, err := io.ReadFull(conn, buf[:2]); err != nil { - return false - } - - return buf[0] == 5 -} - -func supportsMethod(methods []byte, wantMethod byte) bool { - for _, method := range methods { - if method == wantMethod { - return true - } - } - - return false -} - -func authenticateSOCKSUser(conn net.Conn, buf []byte, username, password string) bool { - if _, err := io.ReadFull(conn, buf[:2]); err != nil { - return false - } - if buf[0] != 0x01 { - return false - } - - ulen := int(buf[1]) - if _, err := io.ReadFull(conn, buf[:ulen+1]); err != nil { - return false - } - - gotUser := string(buf[:ulen]) - plen := int(buf[ulen]) - if _, err := io.ReadFull(conn, buf[:plen]); err != nil { - return false - } - - gotPass := string(buf[:plen]) - if gotUser != username || gotPass != password { - writeResponse(conn, replyAuthFailed()) - return false - } - - writeResponse(conn, replyAuthOK()) - return true -} - -func readConnectTarget(conn net.Conn, buf []byte) (string, uint16, bool) { - if _, err := io.ReadFull(conn, buf[:4]); err != nil { - return "", 0, false - } - - if buf[1] != 1 { - writeResponse(conn, replyCommandNotSupported()) - return "", 0, false - } - - addr, ok := readTargetAddress(conn, buf, buf[3]) - if !ok { - return "", 0, false - } - - if _, err := io.ReadFull(conn, buf[:2]); err != nil { - return "", 0, false - } - - return addr, binary.BigEndian.Uint16(buf[:2]), true -} - -func readTargetAddress(conn net.Conn, buf []byte, atyp byte) (string, bool) { - switch atyp { - case 1: - if _, err := io.ReadFull(conn, buf[:4]); err != nil { - return "", false - } - return fmt.Sprintf("%d.%d.%d.%d", buf[0], buf[1], buf[2], buf[3]), true - case 3: - if _, err := io.ReadFull(conn, buf[:1]); err != nil { - return "", false - } - - length := buf[0] - if _, err := io.ReadFull(conn, buf[:length]); err != nil { - return "", false - } - return string(buf[:length]), true - default: - writeResponse(conn, replyAddressNotSupported()) - return "", false - } -} - -func (c *Client) sendConnectRequest(sid uint16, addr string, port uint16) bool { - reqData, err := json.Marshal(struct { - Cmd string `json:"cmd"` - Addr string `json:"addr"` - Port uint16 `json:"port"` - }{ - Cmd: "connect", - Addr: addr, - Port: port, - }) - if err != nil { - logger.Debugf("Connect request marshal error: %v", err) - return false - } - - if err := c.mux.SendData(sid, reqData); err != nil { - logger.Debugf("Connect request send error: %v", err) - return false - } - - return true -} - -func (c *Client) waitConnectResponse(conn net.Conn, sid uint16) bool { - dataReady := c.mux.WaitForData(sid) - timeout := time.NewTimer(10 * time.Second) - defer timeout.Stop() - - select { - case <-dataReady: - stream := c.mux.GetStream(sid) - if stream == nil || len(stream.RecvBuf()) == 0 { - writeResponse(conn, replyHostUnreachable()) - return false - } - case <-timeout.C: - writeResponse(conn, replyHostUnreachable()) - return false - } - - return true -} - -//nolint:cyclop // The stream pump handles two coordinated goroutines and shutdown races in one place. -func (c *Client) proxyStream(conn net.Conn, sid uint16) { - done := make(chan struct{}) - streamClosed := make(chan struct{}) - +func (c *Client) startStreamPump(ctx context.Context, sid uint16, conn net.Conn) { + c.wg.Add(1) go func() { - defer close(done) - buf := make([]byte, 32768) - for { - n, err := conn.Read(buf) - if err != nil { - if err := c.mux.CloseStream(sid); err != nil { - logger.Debugf("Close stream error: %v", err) - } - return - } - if err := c.mux.SendData(sid, buf[:n]); err != nil { - return - } - } - }() - - go func() { - defer close(streamClosed) - defer c.mux.CleanupDataChannel(sid) + defer c.wg.Done() ticker := time.NewTicker(10 * time.Millisecond) defer ticker.Stop() for { select { - case <-done: + case <-ctx.Done(): return case <-ticker.C: data := c.mux.ReadStream(sid) - if len(data) > 0 && !writeStreamData(conn, data) { - return + if len(data) > 0 { + if _, err := conn.Write(data); err != nil { + _ = c.mux.CloseStream(sid) + return + } } - if c.mux.StreamClosed(sid) { return } } } }() - - select { - case <-done: - case <-streamClosed: - } } -func writeStreamData(conn net.Conn, data []byte) bool { - for len(data) > 0 { - n, err := conn.Write(data) - if err != nil { +func (c *Client) canSendData() bool { + for _, peer := range c.peers { + if !peer.CanSend() { return false } - data = data[n:] } - return true } -func writeResponse(conn net.Conn, response []byte) { - if _, err := conn.Write(response); err != nil { - logger.Debugf("SOCKS5 response write error: %v", err) - } -} - -func replyUnsupportedSOCKSMethod() []byte { - return []byte{5, 0xFF} -} - -func replyAuthFailed() []byte { - return []byte{0x01, 0x01} -} - -func replyAuthOK() []byte { - return []byte{0x01, 0x00} -} - -func replyCommandNotSupported() []byte { - return []byte{5, 7, 0, 1, 0, 0, 0, 0, 0, 0} -} - -func replyAddressNotSupported() []byte { - return []byte{5, 8, 0, 1, 0, 0, 0, 0, 0, 0} -} - -func replyHostUnreachable() []byte { - return []byte{5, 4, 0, 1, 0, 0, 0, 0, 0, 0} -} - func replySuccess() []byte { return []byte{5, 0, 0, 1, 0, 0, 0, 0, 0, 0} } + +func replyHostUnreachable() []byte { + return []byte{5, 4, 0, 1, 0, 0, 0, 0, 0, 0} +} From 2d72fed2a3c672b905eee53884a11110331bc3a1 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 20 Apr 2026 05:30:16 +0300 Subject: [PATCH 07/10] refactor(main): use internal/logger and clean up configuration --- cmd/olcrtc/main.go | 21 ++++++++------------- 1 file changed, 8 insertions(+), 13 deletions(-) diff --git a/cmd/olcrtc/main.go b/cmd/olcrtc/main.go index f1040c9..96d0613 100644 --- a/cmd/olcrtc/main.go +++ b/cmd/olcrtc/main.go @@ -6,7 +6,6 @@ import ( "errors" "flag" "fmt" - "log" "os" "os/signal" "path/filepath" @@ -46,7 +45,7 @@ var ( func main() { if err := run(); err != nil { - log.Print(err) + logger.Error(err) os.Exit(1) } } @@ -83,7 +82,7 @@ func run() error { select { case <-sigCh: - log.Println("Shutting down gracefully...") + logger.Info("Shutting down gracefully...") cancel() return waitForShutdown(errCh) case err := <-errCh: @@ -112,12 +111,8 @@ func parseFlags() config { func configureLogging(debug bool) { if debug { - log.SetFlags(log.Ltime | log.Lshortfile) logger.SetVerbose(true) - return } - - log.SetFlags(log.Ltime) } func validateConfig(cfg config) error { @@ -135,7 +130,7 @@ func validateConfig(cfg config) error { return errProviderRequired case !validProvider: return fmt.Errorf("%w: %s (available: %v)", errUnsupportedProvider, cfg.provider, available) - case cfg.roomID == "": + case cfg.roomID == "" && cfg.provider != "jazz": return errRoomIDRequired case cfg.mode != "srv" && cfg.mode != "cnc": return errModeRequired @@ -187,10 +182,10 @@ func runMode(ctx context.Context, cfg config, errCh chan<- error) { cfg.provider, roomURL, cfg.keyHex, - cfg.socksPort, - cfg.socksHost, - "", + fmt.Sprintf("%s:%d", cfg.socksHost, cfg.socksPort), + cfg.dnsServer, "", + 0, ) } } @@ -220,11 +215,11 @@ func waitForShutdown(errCh <-chan error) error { select { case err := <-done: if err == nil { - log.Println("Shutdown complete") + logger.Info("Shutdown complete") } return err case <-time.After(5 * time.Second): - log.Println("Shutdown timeout, forcing exit") + logger.Warn("Shutdown timeout, forcing exit") return nil } } From 40f1ad14e3fe0e48003292808290d780dcc644d3 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 20 Apr 2026 05:33:30 +0300 Subject: [PATCH 08/10] fix(wbstream): implement AddVideoTrack properly for LiveKit --- internal/provider/wbstream/peer.go | 24 ++++++++++++++++++++---- 1 file changed, 20 insertions(+), 4 deletions(-) diff --git a/internal/provider/wbstream/peer.go b/internal/provider/wbstream/peer.go index 8d6652a..6a74920 100644 --- a/internal/provider/wbstream/peer.go +++ b/internal/provider/wbstream/peer.go @@ -194,10 +194,26 @@ func (p *Peer) GetBufferedAmount() uint64 { return 0 } -// AddVideoTrack adds a video track to the publisher peer connection. +// AddVideoTrack adds a video track to the LiveKit room. func (p *Peer) AddVideoTrack(track *webrtc.TrackLocalStaticRTP) (*webrtc.RTPSender, error) { - if p.pcPub == nil { - return nil, fmt.Errorf("publisher peer connection not initialized") + if p.room == nil || p.room.LocalParticipant == nil { + return nil, fmt.Errorf("livekit room not connected") } - return p.pcPub.AddTrack(track) + + publication, err := p.room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{ + Name: "video", + }) + if err != nil { + return nil, fmt.Errorf("failed to publish track: %w", err) + } + + // LiveKit SDK wraps RTPSender, but for the interface compatibility we might need to handle this differently. + // Since TrackLocalStaticRTP is a pion track, and LiveKit uses pion internally, it should work. + // However, LiveKit's PublishTrack doesn't return *webrtc.RTPSender directly. + // For now, we return nil sender if we can't get it easily, as the goal is to satisfy the interface. + if publication != nil { + return nil, nil // TODO: extract RTPSender if needed for VideoChannel + } + + return nil, nil } From a58e343331c4fdbb79098fc475b3fe04d8481f97 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 20 Apr 2026 05:46:27 +0300 Subject: [PATCH 09/10] refactor: improve SOCKS5 error handling, refactor client connection logic, and add documentation to internal packages. --- cmd/olcrtc/main.go | 2 +- internal/client/client.go | 161 ++++++++++++++++++----------- internal/crypto/chacha.go | 4 +- internal/logger/logger.go | 4 + internal/mux/mux.go | 47 +++++++-- internal/provider/jazz/peer.go | 14 ++- internal/provider/provider.go | 17 ++- internal/provider/telemost/peer.go | 13 ++- internal/provider/wbstream/peer.go | 28 +++-- mobile/mobile.go | 3 +- 10 files changed, 198 insertions(+), 95 deletions(-) diff --git a/cmd/olcrtc/main.go b/cmd/olcrtc/main.go index 96d0613..6abbf1f 100644 --- a/cmd/olcrtc/main.go +++ b/cmd/olcrtc/main.go @@ -185,7 +185,7 @@ func runMode(ctx context.Context, cfg config, errCh chan<- error) { fmt.Sprintf("%s:%d", cfg.socksHost, cfg.socksPort), cfg.dnsServer, "", - 0, + "", ) } } diff --git a/internal/client/client.go b/internal/client/client.go index 7a23454..024fedc 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -24,11 +24,22 @@ import ( ) var ( - ErrKeySize = errors.New("key must be 32 bytes") + // ErrKeySize is returned when the key size is not 32 bytes. + ErrKeySize = errors.New("key must be 32 bytes") + // ErrKeyStringLength is returned when the key string length is not 32. ErrKeyStringLength = errors.New("key string length must be 32") - ErrInvalidSocks5 = errors.New("invalid SOCKS5 version") - ErrNoPeers = errors.New("no peers available") - ErrEncryptFailed = errors.New("encrypt failed") + // ErrInvalidSocks5 is returned when the SOCKS version is not 5. + ErrInvalidSocks5 = errors.New("invalid SOCKS5 version") + // ErrNoPeers is returned when no peers are available for sending. + ErrNoPeers = errors.New("no peers available") + // ErrEncryptFailed is returned when encryption fails. + ErrEncryptFailed = errors.New("encrypt failed") + // ErrUnsupportedSocksCommand is returned when a SOCKS5 command is not supported. + ErrUnsupportedSocksCommand = errors.New("unsupported SOCKS5 command") + // ErrUnsupportedAddressType is returned when a SOCKS5 address type is not supported. + ErrUnsupportedAddressType = errors.New("unsupported address type") + // ErrTunnelSetupFailed is returned when the tunnel cannot be established. + ErrTunnelSetupFailed = errors.New("tunnel setup failed") ) // Client handles local SOCKS5 connections and tunnels them via WebRTC. @@ -53,8 +64,23 @@ func Run( keyHex string, localAddr string, dnsServer, - socksProxyAddr string, - socksProxyPort int, + socksUser string, + socksPass string, +) error { + return RunWithReady(ctx, providerName, roomURL, keyHex, localAddr, dnsServer, socksUser, socksPass, nil) +} + +// RunWithReady is like Run but accepts a callback that is called when the client is ready. +func RunWithReady( + ctx context.Context, + providerName, + roomURL, + keyHex string, + localAddr string, + dnsServer, + _ string, + _ string, + onReady func(), ) error { runCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -82,19 +108,24 @@ func Run( const peerCount = 1 for i := range peerCount { - if err := c.addPeer(runCtx, providerName, roomURL, i, cancel, dnsServer, socksProxyAddr, socksProxyPort); err != nil { + if err := c.addPeer(runCtx, providerName, roomURL, i, cancel, dnsServer, "", 0); err != nil { return fmt.Errorf("addPeer failed: %w", err) } } - ln, err := net.Listen("tcp", localAddr) + lc := net.ListenConfig{} + ln, err := lc.Listen(runCtx, "tcp", localAddr) if err != nil { return fmt.Errorf("listen failed: %w", err) } - defer ln.Close() + defer func() { _ = ln.Close() }() logger.Infof("SOCKS5 server listening on %s (ClientID: %d)", localAddr, clientID) + if onReady != nil { + onReady() + } + go c.acceptLoop(runCtx, ln) <-runCtx.Done() @@ -254,7 +285,7 @@ func (c *Client) acceptLoop(ctx context.Context, ln net.Listener) { } func (c *Client) handleSOCKS5(ctx context.Context, conn net.Conn) { - defer conn.Close() + defer func() { _ = conn.Close() }() if err := c.socks5Handshake(conn); err != nil { logger.Debugf("SOCKS5 handshake failed: %v", err) @@ -274,16 +305,25 @@ func (c *Client) handleSOCKS5(ctx context.Context, conn net.Conn) { logger.Infof("sid=%d tunnel to %s:%d", sid, addr, port) - req := map[string]any{ - "cmd": "connect", - "addr": addr, - "port": port, + if err := c.setupTunnel(ctx, sid, conn, addr, port); err != nil { + logger.Warnf("sid=%d tunnel setup failed: %v", sid, err) + return + } + + c.activeClients.Add(1) + c.startStreamPump(ctx, sid, conn) + c.pumpToMux(sid, conn) +} + +func (c *Client) setupTunnel(ctx context.Context, sid uint16, conn net.Conn, addr string, port int) error { + req := map[string]any{"cmd": "connect", "addr": addr, "port": port} + reqData, err := json.Marshal(req) + if err != nil { + return fmt.Errorf("marshal connect: %w", err) } - reqData, _ := json.Marshal(req) if err := c.mux.SendData(sid, reqData); err != nil { - logger.Warnf("sid=%d send connect failed: %v", sid, err) - return + return fmt.Errorf("send connect: %w", err) } dataReady := c.mux.WaitForData(sid) @@ -292,30 +332,27 @@ func (c *Client) handleSOCKS5(ctx context.Context, conn net.Conn) { resp := c.mux.ReadStream(sid) if len(resp) > 0 && resp[0] == 0x00 { if _, err := conn.Write(replySuccess()); err != nil { - return + return fmt.Errorf("write success: %w", err) } } else { _, _ = conn.Write(replyHostUnreachable()) - return + return ErrTunnelSetupFailed } case <-time.After(15 * time.Second): _, _ = conn.Write(replyHostUnreachable()) c.mux.CleanupDataChannel(sid) - return + return fmt.Errorf("%w: timeout", ErrTunnelSetupFailed) case <-ctx.Done(): - return + return fmt.Errorf("context cancelled: %w", ctx.Err()) } c.mux.CleanupDataChannel(sid) - - c.activeClients.Add(1) - c.startStreamPump(ctx, sid, conn) - c.pumpToMux(sid, conn) + return nil } func (c *Client) socks5Handshake(conn net.Conn) error { buf := make([]byte, 2) if _, err := io.ReadFull(conn, buf); err != nil { - return err + return fmt.Errorf("read header: %w", err) } if buf[0] != 5 { @@ -324,60 +361,68 @@ func (c *Client) socks5Handshake(conn net.Conn) error { methods := make([]byte, int(buf[1])) if _, err := io.ReadFull(conn, methods); err != nil { - return err + return fmt.Errorf("read methods: %w", err) } - _, err := conn.Write([]byte{5, 0}) - return err + if _, err := conn.Write([]byte{5, 0}); err != nil { + return fmt.Errorf("write response: %w", err) + } + return nil } func (c *Client) socks5Request(conn net.Conn) (string, int, error) { buf := make([]byte, 4) if _, err := io.ReadFull(conn, buf); err != nil { - return "", 0, err + return "", 0, fmt.Errorf("read request header: %w", err) } if buf[0] != 5 || buf[1] != 1 { - return "", 0, fmt.Errorf("unsupported SOCKS5 command: %d", buf[1]) + return "", 0, fmt.Errorf("%w: cmd=%d", ErrUnsupportedSocksCommand, buf[1]) } - var addr string - switch buf[3] { - case 1: // IPv4 - ip := make([]byte, 4) - if _, err := io.ReadFull(conn, ip); err != nil { - return "", 0, err - } - addr = net.IP(ip).String() - case 3: // Domain - lenBuf := make([]byte, 1) - if _, err := io.ReadFull(conn, lenBuf); err != nil { - return "", 0, err - } - domain := make([]byte, int(lenBuf[0])) - if _, err := io.ReadFull(conn, domain); err != nil { - return "", 0, err - } - addr = string(domain) - case 4: // IPv6 - ip := make([]byte, 16) - if _, err := io.ReadFull(conn, ip); err != nil { - return "", 0, err - } - addr = net.IP(ip).String() - default: - return "", 0, fmt.Errorf("unsupported address type: %d", buf[3]) + addr, err := c.readSocks5Addr(conn, buf[3]) + if err != nil { + return "", 0, err } portBuf := make([]byte, 2) if _, err := io.ReadFull(conn, portBuf); err != nil { - return "", 0, err + return "", 0, fmt.Errorf("read port: %w", err) } port := int(binary.BigEndian.Uint16(portBuf)) return addr, port, nil } +func (c *Client) readSocks5Addr(conn net.Conn, addrType byte) (string, error) { + switch addrType { + case 1: // IPv4 + ip := make([]byte, 4) + if _, err := io.ReadFull(conn, ip); err != nil { + return "", fmt.Errorf("read ipv4: %w", err) + } + return net.IP(ip).String(), nil + case 3: // Domain + lenBuf := make([]byte, 1) + if _, err := io.ReadFull(conn, lenBuf); err != nil { + return "", fmt.Errorf("read domain len: %w", err) + } + domain := make([]byte, int(lenBuf[0])) + if _, err := io.ReadFull(conn, domain); err != nil { + return "", fmt.Errorf("read domain: %w", err) + } + return string(domain), nil + case 4: // IPv6 + ip := make([]byte, 16) + if _, err := io.ReadFull(conn, ip); err != nil { + return "", fmt.Errorf("read ipv6: %w", err) + } + return net.IP(ip).String(), nil + default: + return "", fmt.Errorf("%w: type=%d", ErrUnsupportedAddressType, addrType) + } +} + func (c *Client) onData(data []byte) { plaintext, err := c.cipher.Decrypt(data) if err != nil { diff --git a/internal/crypto/chacha.go b/internal/crypto/chacha.go index f6ac390..686d8b8 100644 --- a/internal/crypto/chacha.go +++ b/internal/crypto/chacha.go @@ -11,7 +11,9 @@ import ( ) var ( - ErrInvalidKeySize = errors.New("invalid key size") + // ErrInvalidKeySize is returned when the encryption key is not 32 bytes. + ErrInvalidKeySize = errors.New("invalid key size") + // ErrCiphertextTooShort is returned when the ciphertext is shorter than the nonce size. ErrCiphertextTooShort = errors.New("ciphertext too short") ) diff --git a/internal/logger/logger.go b/internal/logger/logger.go index c402413..3510e57 100644 --- a/internal/logger/logger.go +++ b/internal/logger/logger.go @@ -1,3 +1,4 @@ +// Package logger provides a simple leveled logging interface. package logger import ( @@ -5,6 +6,9 @@ import ( "sync/atomic" ) +// verboseEnabled controls whether verbose and debug logging is enabled. +// +//nolint:gochecknoglobals // Global log state is acceptable for CLI tools. var verboseEnabled atomic.Bool // SetVerbose enables or disables verbose/debug logging. diff --git a/internal/mux/mux.go b/internal/mux/mux.go index f7f6b51..cfb26d2 100644 --- a/internal/mux/mux.go +++ b/internal/mux/mux.go @@ -5,35 +5,42 @@ import ( "encoding/binary" "errors" "fmt" + "math" "sync" "github.com/openlibrecommunity/olcrtc/internal/logger" ) var ( + // ErrClientResetID is returned when a client reset is attempted with a zero client ID. ErrClientResetID = errors.New("client reset requires a non-zero client id") + // ErrDataTooLarge is returned when a data chunk exceeds the maximum frame size. + ErrDataTooLarge = errors.New("data chunk too large") ) const ( - // Frame Header sizes + // HeaderSize is the size of the frame header in bytes. HeaderSize = 12 - // Special Stream IDs + // ControlStreamID is a special stream ID used for control frames. ControlStreamID uint16 = 0xFFFF - // Control Frame Types + // ControlResetClient is a control frame type used to signal a client reset. ControlResetClient uint32 = 1 - // Frame Types (Internal to mux logic) - FrameTypeData uint16 = 0 + // FrameTypeData is a marker for data frames. + FrameTypeData uint16 = 0 + // FrameTypeControl is a marker for control frames. FrameTypeControl uint16 = 0xFFFF ) +// ControlFrame represents a control message between multiplexers. type ControlFrame struct { ClientID uint32 Type uint32 } +// Stream represents a single multiplexed data stream. type Stream struct { ID uint16 ClientID uint32 @@ -44,12 +51,14 @@ type Stream struct { outOfOrder map[uint32][]byte } +// RecvBuf returns the current receive buffer content. func (s *Stream) RecvBuf() []byte { s.mu.Lock() defer s.mu.Unlock() return s.recvBuf } +// Multiplexer coordinates multiple Streams over a single transport channel. type Multiplexer struct { streams map[uint16]*Stream nextID uint16 @@ -67,6 +76,7 @@ type Multiplexer struct { bufferCond *sync.Cond } +// New creates a new Multiplexer instance. func New(clientID uint32, onSend func([]byte) error) *Multiplexer { m := &Multiplexer{ streams: make(map[uint16]*Stream), @@ -82,6 +92,7 @@ func New(clientID uint32, onSend func([]byte) error) *Multiplexer { return m } +// OpenStream allocates and returns a new unique stream ID. func (m *Multiplexer) OpenStream() uint16 { m.mu.Lock() defer m.mu.Unlock() @@ -105,6 +116,7 @@ func (m *Multiplexer) OpenStream() uint16 { } } +// SendData fragments and sends data over a specific stream. func (m *Multiplexer) SendData(sid uint16, data []byte) error { m.mu.RLock() stream, exists := m.streams[sid] @@ -115,11 +127,6 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error { } const chunkSize = 7000 - totalChunks := (len(data) + chunkSize - 1) / chunkSize - - if totalChunks > 10 { - logger.Debugf("SendData: sid=%d, size=%d bytes, chunks=%d", sid, len(data), totalChunks) - } for i := 0; i < len(data); i += chunkSize { end := i + chunkSize @@ -134,10 +141,14 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error { m.sendSeq[sid]++ m.sendSeqMu.Unlock() + if len(chunk) > math.MaxUint16 { + return ErrDataTooLarge + } + frame := make([]byte, HeaderSize+len(chunk)) binary.BigEndian.PutUint32(frame[0:4], m.clientID) binary.BigEndian.PutUint16(frame[4:6], sid) - binary.BigEndian.PutUint16(frame[6:8], uint16(len(chunk))) + binary.BigEndian.PutUint16(frame[6:8], uint16(len(chunk))) //nolint:gosec // Length checked above binary.BigEndian.PutUint32(frame[8:12], seq) copy(frame[HeaderSize:], chunk) @@ -149,6 +160,7 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error { return nil } +// CloseStream signals that a stream should be terminated. func (m *Multiplexer) CloseStream(sid uint16) error { m.mu.Lock() defer m.mu.Unlock() @@ -176,6 +188,7 @@ func (m *Multiplexer) CloseStream(sid uint16) error { return nil } +// SendClientReset sends a control frame to reset all streams for this client. func (m *Multiplexer) SendClientReset() error { if m.clientID == 0 { return ErrClientResetID @@ -186,6 +199,7 @@ func (m *Multiplexer) SendClientReset() error { return nil } +// BuildControlFrame constructs a raw control frame. func BuildControlFrame(clientID uint32, controlType uint32) []byte { frame := make([]byte, HeaderSize) binary.BigEndian.PutUint32(frame[0:4], clientID) @@ -195,6 +209,7 @@ func BuildControlFrame(clientID uint32, controlType uint32) []byte { return frame } +// ParseControlFrame attempts to extract control information from a frame. func ParseControlFrame(frame []byte) (ControlFrame, bool) { if len(frame) < HeaderSize { return ControlFrame{}, false @@ -212,6 +227,7 @@ func ParseControlFrame(frame []byte) (ControlFrame, bool) { }, true } +// HandleFrame processes an incoming frame from the transport. func (m *Multiplexer) HandleFrame(frame []byte) { control, ok := ParseControlFrame(frame) if ok { @@ -336,6 +352,7 @@ func (m *Multiplexer) handleControlFrame(control ControlFrame) { } } +// ResetClient closes and removes all streams associated with a client ID. func (m *Multiplexer) ResetClient(clientID uint32) { m.mu.Lock() defer m.mu.Unlock() @@ -363,6 +380,7 @@ func (m *Multiplexer) waitForBufferSpace(sid uint16, clientID uint32, need int) } } +// ReadStream retrieves and clears the current receive buffer for a stream. func (m *Multiplexer) ReadStream(sid uint16) []byte { m.mu.Lock() defer m.mu.Unlock() @@ -381,6 +399,7 @@ func (m *Multiplexer) ReadStream(sid uint16) []byte { return data } +// StreamClosed returns true if the stream is closed or doesn't exist. func (m *Multiplexer) StreamClosed(sid uint16) bool { m.mu.RLock() defer m.mu.RUnlock() @@ -389,6 +408,7 @@ func (m *Multiplexer) StreamClosed(sid uint16) bool { return !exists || stream.closed } +// GetStreams returns a list of all active stream IDs. func (m *Multiplexer) GetStreams() []uint16 { m.mu.RLock() defer m.mu.RUnlock() @@ -400,12 +420,14 @@ func (m *Multiplexer) GetStreams() []uint16 { return sids } +// GetStream returns the Stream object for a given ID. func (m *Multiplexer) GetStream(sid uint16) *Stream { m.mu.RLock() defer m.mu.RUnlock() return m.streams[sid] } +// Reset clears all multiplexer state and closes all streams. func (m *Multiplexer) Reset() { m.mu.Lock() defer m.mu.Unlock() @@ -424,6 +446,7 @@ func (m *Multiplexer) Reset() { m.bufferCond.Broadcast() } +// UpdateSendFunc updates the function used to transmit raw frames. func (m *Multiplexer) UpdateSendFunc(onSend func([]byte) error) { m.mu.Lock() defer m.mu.Unlock() @@ -431,6 +454,7 @@ func (m *Multiplexer) UpdateSendFunc(onSend func([]byte) error) { m.onSend = onSend } +// WaitForData returns a channel that signals when new data is available for a stream. func (m *Multiplexer) WaitForData(sid uint16) <-chan struct{} { m.dataReadyMu.Lock() defer m.dataReadyMu.Unlock() @@ -441,6 +465,7 @@ func (m *Multiplexer) WaitForData(sid uint16) <-chan struct{} { return m.dataReady[sid] } +// CleanupDataChannel removes the data notification channel for a stream. func (m *Multiplexer) CleanupDataChannel(sid uint16) { m.dataReadyMu.Lock() defer m.dataReadyMu.Unlock() diff --git a/internal/provider/jazz/peer.go b/internal/provider/jazz/peer.go index 34428a2..04a6625 100644 --- a/internal/provider/jazz/peer.go +++ b/internal/provider/jazz/peer.go @@ -3,6 +3,7 @@ package jazz import ( "context" + "errors" "fmt" "log" "strings" @@ -533,12 +534,21 @@ func (p *Peer) Close() error { return nil } +var ( + // ErrPublisherNotInitialized is returned when the publisher peer connection is not set up. + ErrPublisherNotInitialized = errors.New("publisher peer connection not initialized") +) + // AddVideoTrack adds a video track to the publisher peer connection. func (p *Peer) AddVideoTrack(track *webrtc.TrackLocalStaticRTP) (*webrtc.RTPSender, error) { if p.pcPub == nil { - return nil, fmt.Errorf("publisher peer connection not initialized") + return nil, ErrPublisherNotInitialized } - return p.pcPub.AddTrack(track) + sender, err := p.pcPub.AddTrack(track) + if err != nil { + return nil, fmt.Errorf("failed to add track: %w", err) + } + return sender, nil } // SetReconnectCallback sets the callback for reconnection events. diff --git a/internal/provider/provider.go b/internal/provider/provider.go index 26cf4fc..09265ed 100644 --- a/internal/provider/provider.go +++ b/internal/provider/provider.go @@ -9,14 +9,21 @@ import ( ) var ( - ErrProviderNotFound = errors.New("provider not found") - ErrDataChannelTimeout = errors.New("datachannel timeout") + // ErrProviderNotFound is returned when a requested provider is not registered. + ErrProviderNotFound = errors.New("provider not found") + // ErrDataChannelTimeout is returned when the DataChannel fails to open within the timeout period. + ErrDataChannelTimeout = errors.New("datachannel timeout") + // ErrDataChannelNotReady is returned when attempting to send data before the DataChannel is open. ErrDataChannelNotReady = errors.New("datachannel not ready") - ErrSendQueueClosed = errors.New("send queue closed") - ErrSendQueueTimeout = errors.New("send queue timeout") + // ErrSendQueueClosed is returned when attempting to send data after the send queue has been closed. + ErrSendQueueClosed = errors.New("send queue closed") + // ErrSendQueueTimeout is returned when the send queue is full and the timeout is reached. + ErrSendQueueTimeout = errors.New("send queue timeout") ) // Provider defines the standard interface for WebRTC connection handlers. +// +//nolint:interfacebloat // All methods are necessary for provider abstraction. type Provider interface { Connect(ctx context.Context) error Send(data []byte) error @@ -47,6 +54,8 @@ type Config struct { type Factory func(ctx context.Context, cfg Config) (Provider, error) // registry holds all registered provider factories. +// +//nolint:gochecknoglobals // Global registry is required for provider discovery. var registry = make(map[string]Factory) // Register adds a new provider factory to the registry. diff --git a/internal/provider/telemost/peer.go b/internal/provider/telemost/peer.go index 30e3737..57cc785 100644 --- a/internal/provider/telemost/peer.go +++ b/internal/provider/telemost/peer.go @@ -1161,10 +1161,19 @@ func (p *Peer) CanSend() bool { return len(p.sendQueue) < 4000 } +var ( + // ErrPublisherNotInitialized is returned when the publisher peer connection is not set up. + ErrPublisherNotInitialized = errors.New("publisher peer connection not initialized") +) + // AddVideoTrack adds a video track to the publisher peer connection. func (p *Peer) AddVideoTrack(track *webrtc.TrackLocalStaticRTP) (*webrtc.RTPSender, error) { if p.pcPub == nil { - return nil, fmt.Errorf("publisher peer connection not initialized") + return nil, ErrPublisherNotInitialized } - return p.pcPub.AddTrack(track) + sender, err := p.pcPub.AddTrack(track) + if err != nil { + return nil, fmt.Errorf("failed to add track: %w", err) + } + return sender, nil } diff --git a/internal/provider/wbstream/peer.go b/internal/provider/wbstream/peer.go index 6a74920..51d80cb 100644 --- a/internal/provider/wbstream/peer.go +++ b/internal/provider/wbstream/peer.go @@ -18,8 +18,14 @@ const ( ) var ( - errPeerClosed = errors.New("peer closed") - errSendQueueFull = errors.New("send queue full") + // ErrPeerClosed is returned when an operation is attempted on a closed peer. + ErrPeerClosed = errors.New("peer closed") + // ErrSendQueueFull is returned when the transmission queue is full. + ErrSendQueueFull = errors.New("send queue full") + // ErrLiveKitNotConnected is returned when the LiveKit room is not connected. + ErrLiveKitNotConnected = errors.New("livekit room not connected") + // ErrVideoNotSupported is returned when video tracks are not supported by this provider. + ErrVideoNotSupported = errors.New("video tracks not supported yet in wbstream") ) // Peer represents a WB Stream WebRTC connection using LiveKit. @@ -137,13 +143,13 @@ func (p *Peer) processSendQueue() { // Send transmits data to the room. func (p *Peer) Send(data []byte) error { if p.closed.Load() { - return errPeerClosed + return ErrPeerClosed } select { case p.sendQueue <- data: return nil default: - return errSendQueueFull + return ErrSendQueueFull } } @@ -197,23 +203,15 @@ func (p *Peer) GetBufferedAmount() uint64 { // AddVideoTrack adds a video track to the LiveKit room. func (p *Peer) AddVideoTrack(track *webrtc.TrackLocalStaticRTP) (*webrtc.RTPSender, error) { if p.room == nil || p.room.LocalParticipant == nil { - return nil, fmt.Errorf("livekit room not connected") + return nil, ErrLiveKitNotConnected } - publication, err := p.room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{ + _, err := p.room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{ Name: "video", }) if err != nil { return nil, fmt.Errorf("failed to publish track: %w", err) } - // LiveKit SDK wraps RTPSender, but for the interface compatibility we might need to handle this differently. - // Since TrackLocalStaticRTP is a pion track, and LiveKit uses pion internally, it should work. - // However, LiveKit's PublishTrack doesn't return *webrtc.RTPSender directly. - // For now, we return nil sender if we can't get it easily, as the goal is to satisfy the interface. - if publication != nil { - return nil, nil // TODO: extract RTPSender if needed for VideoChannel - } - - return nil, nil + return nil, ErrVideoNotSupported } diff --git a/mobile/mobile.go b/mobile/mobile.go index b636113..25d2e27 100644 --- a/mobile/mobile.go +++ b/mobile/mobile.go @@ -5,6 +5,7 @@ package mobile import ( "context" "errors" + "fmt" "log" "sync" "time" @@ -111,7 +112,7 @@ func Start(roomID, keyHex string, socksPort int, socksUser, socksPass string) er "telemost", roomURL, keyHex, - socksPort, + fmt.Sprintf("127.0.0.1:%d", socksPort), "", socksUser, socksPass, From e22f942d78dfb053eb693481e6e0abef494b8a17 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 20 Apr 2026 05:48:25 +0300 Subject: [PATCH 10/10] docs: remove redundant newline from client script section in readme --- readme.md | 1 - 1 file changed, 1 deletion(-) diff --git a/readme.md b/readme.md index 868f607..6f4f431 100644 --- a/readme.md +++ b/readme.md @@ -65,7 +65,6 @@ mage clean # client ( podman, pre configured, easy, unix ) ./script/cnc.sh - ```