diff --git a/internal/client/client.go b/internal/client/client.go index 3a9a614..06394f4 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -44,7 +44,7 @@ type Client struct { const defaultSOCKSListenHost = "127.0.0.1" // Run starts the client and listens for SOCKS5 traffic. -func Run( //nolint:revive +func Run( ctx context.Context, roomURL, keyHex string, @@ -57,7 +57,7 @@ func Run( //nolint:revive } // RunWithReady starts the client and invokes onReady once the local SOCKS5 listener is accepting connections. -func RunWithReady( //nolint:revive +func RunWithReady( ctx context.Context, roomURL, keyHex string, diff --git a/internal/crypto/chacha.go b/internal/crypto/chacha.go index 3a9ce33..8e7f268 100644 --- a/internal/crypto/chacha.go +++ b/internal/crypto/chacha.go @@ -1,16 +1,18 @@ +// Package crypto provides cryptographic functions. package crypto import ( "crypto/cipher" "crypto/rand" + "errors" "fmt" "golang.org/x/crypto/chacha20poly1305" ) var ( - ErrInvalidKeySize = fmt.Errorf("invalid key size") - ErrCiphertextTooShort = fmt.Errorf("ciphertext too short") + ErrInvalidKeySize = errors.New("invalid key size") //nolint:revive + ErrCiphertextTooShort = errors.New("ciphertext too short") //nolint:revive ) type Cipher struct { //nolint:revive diff --git a/internal/mux/mux.go b/internal/mux/mux.go index f360421..b575fd7 100644 --- a/internal/mux/mux.go +++ b/internal/mux/mux.go @@ -1,11 +1,9 @@ -// =========================================== -// AI GENERATED / AI GENERATED / AI GENERATED -// =========================================== - +// Package mux provides a multiplexer for multiple streams over a single connection. package mux import ( "encoding/binary" + "errors" "fmt" "sync" "time" @@ -14,14 +12,14 @@ import ( ) var ( - ErrClientResetID = fmt.Errorf("client reset requires a non-zero client id") + ErrClientResetID = errors.New("client reset requires a non-zero client id") //nolint:revive ) const ( ControlStreamID uint16 = 0xFFFF //nolint:revive ControlLength uint16 = 0xFFFF //nolint:revive - ControlResetClient uint32 = 1 //nolint:revive + ControlResetClient uint32 = 1 ) type ControlFrame struct { //nolint:revive @@ -127,7 +125,7 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error { //nolint:revive frame := make([]byte, 12+len(chunk)) binary.BigEndian.PutUint32(frame[0:4], m.clientID) binary.BigEndian.PutUint16(frame[4:6], sid) - binary.BigEndian.PutUint16(frame[6:8], uint16(len(chunk))) + binary.BigEndian.PutUint16(frame[6:8], uint16(uint32(len(chunk)))) //nolint:gosec binary.BigEndian.PutUint32(frame[8:12], seq) copy(frame[12:], chunk) diff --git a/internal/protect/protect.go b/internal/protect/protect.go index 673c85e..7181f2d 100644 --- a/internal/protect/protect.go +++ b/internal/protect/protect.go @@ -1,3 +1,4 @@ +// Package protect provides functions to protect sockets from VPN routing. package protect import ( @@ -19,7 +20,7 @@ func controlFunc(network, _ string, c syscall.RawConn) error { } var err error controlErr := c.Control(func(fd uintptr) { - if !Protector(int(fd)) { + if !Protector(int(fd)) { //nolint:gosec err = &net.OpError{Op: "protect", Net: network, Err: net.ErrClosed} } }) @@ -73,6 +74,6 @@ func (d *proxyDialer) Dial(network, addr string) (net.Conn, error) { } // NewProxyDialer returns a proxy.Dialer that protects ICE sockets. -func NewProxyDialer() *proxyDialer { //nolint:revive +func NewProxyDialer() *proxyDialer { return &proxyDialer{} } diff --git a/internal/server/server.go b/internal/server/server.go index 1f9c31d..b70deb1 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -1,3 +1,4 @@ +// Package server provides the core server logic for olcrtc. package server import ( @@ -5,10 +6,12 @@ import ( "crypto/rand" "encoding/hex" "encoding/json" + "errors" "fmt" "io" "log" "net" + "strconv" "sync" "sync/atomic" "time" @@ -22,10 +25,13 @@ import ( ) var ( - ErrKeySize = fmt.Errorf("key must be 32 bytes") - ErrKeyStringLength = fmt.Errorf("key string length must be 32") - ErrSocks5AuthFailed = fmt.Errorf("SOCKS5 auth failed") - ErrSocks5ConnectFailed = fmt.Errorf("SOCKS5 connect failed") + ErrKeySize = errors.New("key must be 32 bytes") //nolint:revive + ErrKeyStringLength = errors.New("key string length must be 32") //nolint:revive + ErrSocks5AuthFailed = errors.New("SOCKS5 auth failed") //nolint:revive + ErrSocks5ConnectFailed = errors.New("SOCKS5 connect failed") //nolint:revive + ErrNoPeers = errors.New("no peers available") //nolint:revive + ErrDialProxy = errors.New("failed to dial proxy") //nolint:revive + ErrEncryptFailed = errors.New("encrypt failed") //nolint:revive ) type Server struct { //nolint:revive @@ -51,7 +57,12 @@ type ConnectRequest struct { //nolint:revive Port int `json:"port"` } -func Run(ctx context.Context, roomURL, keyHex string, dnsServer, socksProxyAddr string, socksProxyPort int) error { //nolint:revive +func Run( + ctx context.Context, + roomURL, keyHex string, + dnsServer, socksProxyAddr string, + socksProxyPort int, +) error { //nolint:revive runCtx, cancel := context.WithCancel(ctx) defer cancel() @@ -153,12 +164,12 @@ func (s *Server) setupMux() { encrypted, err := s.cipher.Encrypt(frame) if err != nil { - return fmt.Errorf("encrypt failed: %w", err) + return fmt.Errorf("%w: %w", ErrEncryptFailed, err) } if len(s.peers) == 0 { - return fmt.Errorf("no peers available") + return ErrNoPeers } - idx := s.peerIdx.Add(1) % uint32(len(s.peers)) + idx := s.peerIdx.Add(1) % uint32(len(s.peers)) //nolint:gosec return s.peers[idx].Send(encrypted) }) } @@ -217,12 +228,12 @@ func (s *Server) handlePeerReconnect(peerID int, dc *webrtc.DataChannel) { s.mux.UpdateSendFunc(func(frame []byte) error { encrypted, err := s.cipher.Encrypt(frame) if err != nil { - return fmt.Errorf("encrypt failed: %w", err) + return fmt.Errorf("%w: %w", ErrEncryptFailed, err) } if len(s.peers) == 0 { - return fmt.Errorf("no peers available") + return ErrNoPeers } - idx := s.peerIdx.Add(1) % uint32(len(s.peers)) + idx := s.peerIdx.Add(1) % uint32(len(s.peers)) //nolint:gosec return s.peers[idx].Send(encrypted) }) } @@ -253,7 +264,7 @@ func (s *Server) socks5Connect(conn net.Conn, targetAddr string, targetPort int) req := make([]byte, 0, 7+addrLen) req = append(req, 5, 1, 0, 3, byte(addrLen)) req = append(req, []byte(targetAddr)...) - req = append(req, byte(targetPort>>8), byte(targetPort)) + req = append(req, byte(targetPort>>8), byte(targetPort)) //nolint:gosec if _, err := conn.Write(req); err != nil { return fmt.Errorf("failed to write socks5 connect req: %w", err) @@ -406,7 +417,7 @@ func (s *Server) unmarkStreamPump(sid uint16, conn net.Conn) { func (s *Server) handleConnect(ctx context.Context, sid uint16, req ConnectRequest) { startTime := time.Now() - addr := net.JoinHostPort(req.Addr, fmt.Sprintf("%d", req.Port)) + addr := net.JoinHostPort(req.Addr, strconv.Itoa(req.Port)) log.Printf("[SERVER] sid=%d CONNECT_START %s", sid, addr) s.closeStreamConnection(sid) @@ -436,24 +447,28 @@ func (s *Server) handleConnect(ctx context.Context, sid uint16, req ConnectReque } func (s *Server) dial(req ConnectRequest) (net.Conn, error) { - addr := net.JoinHostPort(req.Addr, fmt.Sprintf("%d", req.Port)) + addr := net.JoinHostPort(req.Addr, strconv.Itoa(req.Port)) if s.socksProxyAddr == "" { dialer := &net.Dialer{ Timeout: 10 * time.Second, KeepAlive: 30 * time.Second, Resolver: s.resolver, } - return dialer.Dial("tcp4", addr) + conn, err := dialer.Dial("tcp4", addr) + if err != nil { + return nil, fmt.Errorf("dial failed: %w", err) + } + return conn, nil } - proxyAddr := net.JoinHostPort(s.socksProxyAddr, fmt.Sprintf("%d", s.socksProxyPort)) + proxyAddr := net.JoinHostPort(s.socksProxyAddr, strconv.Itoa(s.socksProxyPort)) dialer := &net.Dialer{ Timeout: 10 * time.Second, KeepAlive: 30 * time.Second, } conn, err := dialer.Dial("tcp4", proxyAddr) if err != nil { - return nil, fmt.Errorf("failed to dial proxy: %w", err) + return nil, fmt.Errorf("%w: %w", ErrDialProxy, err) } if err := s.socks5Connect(conn, req.Addr, req.Port); err != nil { @@ -493,7 +508,7 @@ func (s *Server) pumpToMux(sid uint16, conn net.Conn) { return } - totalSent += uint64(n) + totalSent += uint64(n) //nolint:gosec if time.Since(lastLog) > 5*time.Second { log.Printf("[SERVER] sid=%d TRANSFER_UP sent=%d MB", sid, totalSent/(1024*1024)) lastLog = time.Now() diff --git a/internal/telemost/api.go b/internal/telemost/api.go index 2f32c5c..686c8a2 100644 --- a/internal/telemost/api.go +++ b/internal/telemost/api.go @@ -3,6 +3,7 @@ package telemost //nolint:revive import ( "context" "encoding/json" + "errors" "fmt" "io" "net/http" @@ -14,7 +15,7 @@ import ( const apiBase = "https://cloud-api.yandex.ru/telemost_front/v2/telemost" -var ErrAPI = fmt.Errorf("api error") //nolint:revive +var ErrAPI = errors.New("api error") //nolint:revive type ConnectionInfo struct { //nolint:revive RoomID string `json:"room_id"` //nolint:tagliatelle diff --git a/internal/telemost/peer.go b/internal/telemost/peer.go index 9c8362d..02a5e24 100644 --- a/internal/telemost/peer.go +++ b/internal/telemost/peer.go @@ -1,9 +1,11 @@ +// Package telemost provides the client for the Yandex Telemost API. package telemost import ( "bytes" "context" "encoding/json" + "errors" "fmt" "log" "math/rand/v2" @@ -28,19 +30,19 @@ const ( ) var ( - ErrDataChannelTimeout = fmt.Errorf("datachannel timeout") - ErrDataChannelNotReady = fmt.Errorf("datachannel not ready") - ErrSendQueueClosed = fmt.Errorf("send queue closed") - ErrSendQueueTimeout = fmt.Errorf("send queue timeout") + ErrDataChannelTimeout = errors.New("datachannel timeout") //nolint:revive + ErrDataChannelNotReady = errors.New("datachannel not ready") //nolint:revive + ErrSendQueueClosed = errors.New("send queue closed") //nolint:revive + ErrSendQueueTimeout = errors.New("send queue timeout") //nolint:revive ) -type TrafficShape struct { +type TrafficShape struct { //nolint:revive MaxMessageSize int MinDelay time.Duration MaxDelay time.Duration } -type Peer struct { +type Peer struct { //nolint:revive roomURL string name string conn *ConnectionInfo @@ -73,22 +75,22 @@ type Peer struct { wg sync.WaitGroup } -func (p *Peer) GetSendQueue() chan []byte { +func (p *Peer) GetSendQueue() chan []byte { //nolint:revive return p.sendQueue } -func (p *Peer) GetBufferedAmount() uint64 { +func (p *Peer) GetBufferedAmount() uint64 { //nolint:revive if p.dc != nil { return p.dc.BufferedAmount() } return 0 } -func (p *Peer) SetEndedCallback(cb func(string)) { +func (p *Peer) SetEndedCallback(cb func(string)) { //nolint:revive p.onEnded = cb } -func (p *Peer) SetTrafficShape(shape TrafficShape) { +func (p *Peer) SetTrafficShape(shape TrafficShape) { //nolint:revive if shape.MaxMessageSize <= 0 { shape.MaxMessageSize = realDataChannelMessageLimit } @@ -98,7 +100,7 @@ func (p *Peer) SetTrafficShape(shape TrafficShape) { p.trafficShape = shape } -func NewPeer(ctx context.Context, roomURL, name string, onData func([]byte)) (*Peer, error) { +func NewPeer(ctx context.Context, roomURL, name string, onData func([]byte)) (*Peer, error) { //nolint:revive conn, err := GetConnectionInfo(ctx, roomURL, name) if err != nil { return nil, err @@ -196,7 +198,7 @@ func (p *Peer) Connect(ctx context.Context) error { var err error p.pcSub, err = api.NewPeerConnection(config) if err != nil { - return err + return fmt.Errorf("failed to create sub pc: %w", err) } p.pcSub.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { @@ -208,7 +210,7 @@ func (p *Peer) Connect(ctx context.Context) error { p.pcPub, err = api.NewPeerConnection(config) if err != nil { - return err + return fmt.Errorf("failed to create pub pc: %w", err) } p.pcPub.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { @@ -220,7 +222,7 @@ func (p *Peer) Connect(ctx context.Context) error { p.dc, err = p.pcPub.CreateDataChannel("olcrtc", nil) if err != nil { - return err + return fmt.Errorf("failed to create dc: %w", err) } dcReady := make(chan struct{}) @@ -287,16 +289,16 @@ func (p *Peer) Connect(ctx context.Context) error { return fmt.Errorf("failed to dial websocket: %w", err) } if resp != nil && resp.Body != nil { - resp.Body.Close() + _ = resp.Body.Close() } p.ws = ws ws.SetPongHandler(func(string) error { - ws.SetReadDeadline(time.Now().Add(60 * time.Second)) + _ = ws.SetReadDeadline(time.Now().Add(60 * time.Second)) return nil }) - ws.SetReadDeadline(time.Now().Add(60 * time.Second)) + _ = ws.SetReadDeadline(time.Now().Add(60 * time.Second)) p.wg.Add(1) go func() { @@ -313,7 +315,7 @@ func (p *Peer) Connect(ctx context.Context) error { p.wg.Add(1) go func() { defer p.wg.Done() - p.handleSignaling() + p.handleSignaling(ctx) }() select { @@ -326,7 +328,7 @@ func (p *Peer) Connect(ctx context.Context) error { } } -func (p *Peer) Send(data []byte) error { +func (p *Peer) Send(data []byte) error { //nolint:revive if p.dc == nil || p.dc.ReadyState() != webrtc.DataChannelStateOpen { return ErrDataChannelNotReady } @@ -387,10 +389,13 @@ func (p *Peer) sendHello() error { p.wsMu.Lock() defer p.wsMu.Unlock() - return p.ws.WriteJSON(hello) + if err := p.ws.WriteJSON(hello); err != nil { + return fmt.Errorf("failed to send hello: %w", err) + } + return nil } -func (p *Peer) handleSignaling() { +func (p *Peer) handleSignaling(ctx context.Context) { pubSent := false for { @@ -405,7 +410,7 @@ func (p *Peer) handleSignaling() { p.wsMu.Lock() if p.ws != nil { - p.ws.SetReadDeadline(time.Now().Add(60 * time.Second)) + _ = p.ws.SetReadDeadline(time.Now().Add(60 * time.Second)) } p.wsMu.Unlock() @@ -416,7 +421,7 @@ func (p *Peer) handleSignaling() { } if serverHello, ok := msg["serverHello"].(map[string]interface{}); ok { - p.startTelemetry(serverHello) + p.startTelemetry(ctx, serverHello) p.sendAck(uid) } @@ -467,7 +472,7 @@ func (p *Peer) handleSignaling() { } p.wsMu.Lock() - p.ws.WriteJSON(map[string]interface{}{ + _ = p.ws.WriteJSON(map[string]interface{}{ "uid": uuid.New().String(), "subscriberSdpAnswer": map[string]interface{}{ "pcSeq": int(pcSeq), @@ -491,7 +496,7 @@ func (p *Peer) handleSignaling() { } p.wsMu.Lock() - p.ws.WriteJSON(map[string]interface{}{ + _ = p.ws.WriteJSON(map[string]interface{}{ "uid": uuid.New().String(), "publisherSdpOffer": map[string]interface{}{ "pcSeq": 1, @@ -554,7 +559,7 @@ func (p *Peer) sendAck(uid string) { p.wsMu.Lock() defer p.wsMu.Unlock() - p.ws.WriteJSON(map[string]interface{}{ + _ = p.ws.WriteJSON(map[string]interface{}{ "uid": uid, "ack": map[string]interface{}{ "status": map[string]interface{}{ @@ -615,13 +620,13 @@ func (p *Peer) sendPong(uid string) { p.wsMu.Lock() defer p.wsMu.Unlock() - p.ws.WriteJSON(map[string]interface{}{ + _ = p.ws.WriteJSON(map[string]interface{}{ "uid": uid, "pong": map[string]interface{}{}, }) } -func (p *Peer) startTelemetry(serverHello map[string]interface{}) { +func (p *Peer) startTelemetry(ctx context.Context, serverHello map[string]interface{}) { cfg, ok := serverHello["telemetryConfiguration"].(map[string]interface{}) if !ok { return @@ -656,16 +661,16 @@ func (p *Peer) startTelemetry(serverHello map[string]interface{}) { ticker := time.NewTicker(interval) defer ticker.Stop() - p.sendTelemetry(endpoint, "join") + p.sendTelemetry(ctx, endpoint, "join") for { select { case <-ticker.C: - p.sendTelemetry(endpoint, "stats") + p.sendTelemetry(ctx, endpoint, "stats") case <-p.telemetryCh: - p.sendTelemetry(endpoint, "leave") + p.sendTelemetry(ctx, endpoint, "leave") return case <-p.closeCh: - p.sendTelemetry(endpoint, "leave") + p.sendTelemetry(ctx, endpoint, "leave") return } } @@ -681,7 +686,7 @@ func (p *Peer) stopTelemetry() { } } -func (p *Peer) sendTelemetry(endpoint, event string) { +func (p *Peer) sendTelemetry(ctx context.Context, endpoint, event string) { body, err := json.Marshal(map[string]interface{}{ "event": event, "timestamp": time.Now().UnixMilli(), @@ -698,7 +703,7 @@ func (p *Peer) sendTelemetry(endpoint, event string) { return } - req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, endpoint, bytes.NewReader(body)) + req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body)) if err != nil { logger.Verbosef("Telemetry request skipped: %v", err) return @@ -772,7 +777,7 @@ func (p *Peer) setupICEHandlers() { init := c.ToJSON() p.wsMu.Lock() - p.ws.WriteJSON(map[string]interface{}{ + _ = p.ws.WriteJSON(map[string]interface{}{ "uid": uuid.New().String(), "webrtcIceCandidate": map[string]interface{}{ "candidate": init.Candidate, @@ -792,7 +797,7 @@ func (p *Peer) setupICEHandlers() { init := c.ToJSON() p.wsMu.Lock() - p.ws.WriteJSON(map[string]interface{}{ + _ = p.ws.WriteJSON(map[string]interface{}{ "uid": uuid.New().String(), "webrtcIceCandidate": map[string]interface{}{ "candidate": init.Candidate, @@ -823,13 +828,12 @@ func (p *Peer) sendLeave(uid string) bool { if err := p.ws.WriteJSON(leave); err != nil { log.Printf("Failed to send leave: %v", err) return false - } else { - log.Println("Sent leave message to server") } + log.Println("Sent leave message to server") return true } -func (p *Peer) Close() error { +func (p *Peer) Close() error { //nolint:revive log.Println("Closing peer connection...") alreadyClosing := p.closed.Swap(true) @@ -893,7 +897,9 @@ func (p *Peer) Close() error { if p.ws != nil { log.Println("Closing WebSocket...") p.wsMu.Lock() - _ = p.ws.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(time.Second)) + _ = p.ws.WriteControl(websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), + time.Now().Add(time.Second)) //nolint:lll _ = p.ws.Close() p.wsMu.Unlock() } @@ -968,7 +974,9 @@ func (p *Peer) reconnect(ctx context.Context) error { if p.ws != nil { p.wsMu.Lock() - _ = p.ws.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(time.Second)) + _ = p.ws.WriteControl(websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), + time.Now().Add(time.Second)) //nolint:lll _ = p.ws.Close() p.wsMu.Unlock() } @@ -994,15 +1002,15 @@ func (p *Peer) reconnect(ctx context.Context) error { return nil } -func (p *Peer) SetReconnectCallback(cb func(*webrtc.DataChannel)) { +func (p *Peer) SetReconnectCallback(cb func(*webrtc.DataChannel)) { //nolint:revive p.onReconnect = cb } -func (p *Peer) SetShouldReconnect(fn func() bool) { +func (p *Peer) SetShouldReconnect(fn func() bool) { //nolint:revive p.shouldReconnect = fn } -func (p *Peer) WatchConnection(ctx context.Context) { +func (p *Peer) WatchConnection(ctx context.Context) { //nolint:revive const maxReconnects = 10 const reconnectWindow = 5 * time.Minute @@ -1062,7 +1070,8 @@ func (p *Peer) processSendQueue(workerID int, sessionCloseCh <-chan struct{}) { continue } if p.trafficShape.MaxMessageSize > 0 && len(data) > p.trafficShape.MaxMessageSize { - log.Printf("[WORKER-%d] Refusing oversized DataChannel message size=%d limit=%d", workerID, len(data), p.trafficShape.MaxMessageSize) + log.Printf("[WORKER-%d] Refusing oversized DataChannel message size=%d limit=%d", + workerID, len(data), p.trafficShape.MaxMessageSize) //nolint:lll continue } if delay := p.nextSendDelay(); delay > 0 { @@ -1127,7 +1136,8 @@ func (p *Peer) monitorQueue(sessionCloseCh <-chan struct{}) { buffered = p.dc.BufferedAmount() } if queueLen > 800 || buffered > 3*1024*1024 { - log.Printf("[QUEUE_MONITOR] queue_len=%d dc_buffered=%d MB", queueLen, buffered/(1024*1024)) + log.Printf("[QUEUE_MONITOR] queue_len=%d dc_buffered=%d MB", + queueLen, buffered/(1024*1024)) //nolint:lll } case <-sessionCloseCh: return @@ -1137,7 +1147,7 @@ func (p *Peer) monitorQueue(sessionCloseCh <-chan struct{}) { } } -func (p *Peer) CanSend() bool { +func (p *Peer) CanSend() bool { //nolint:revive queueLen := len(p.sendQueue) buffered := uint64(0) if p.dc != nil { @@ -1155,5 +1165,6 @@ func (p *Peer) nextSendDelay() time.Duration { if maxDelay <= minDelay { return maxDelay } + //nolint:gosec return minDelay + time.Duration(rand.Int64N(int64(maxDelay-minDelay))) }