mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-05-30 00:49:44 +00:00
refactor: bulk fix golangci-lint issues (perfsprint, revive, errcheck, gosec, etc.)
This commit is contained in:
@@ -44,7 +44,7 @@ type Client struct {
|
||||
const defaultSOCKSListenHost = "127.0.0.1"
|
||||
|
||||
// Run starts the client and listens for SOCKS5 traffic.
|
||||
func Run( //nolint:revive
|
||||
func Run(
|
||||
ctx context.Context,
|
||||
roomURL,
|
||||
keyHex string,
|
||||
@@ -57,7 +57,7 @@ func Run( //nolint:revive
|
||||
}
|
||||
|
||||
// RunWithReady starts the client and invokes onReady once the local SOCKS5 listener is accepting connections.
|
||||
func RunWithReady( //nolint:revive
|
||||
func RunWithReady(
|
||||
ctx context.Context,
|
||||
roomURL,
|
||||
keyHex string,
|
||||
|
||||
@@ -1,16 +1,18 @@
|
||||
// Package crypto provides cryptographic functions.
|
||||
package crypto
|
||||
|
||||
import (
|
||||
"crypto/cipher"
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"fmt"
|
||||
|
||||
"golang.org/x/crypto/chacha20poly1305"
|
||||
)
|
||||
|
||||
var (
|
||||
ErrInvalidKeySize = fmt.Errorf("invalid key size")
|
||||
ErrCiphertextTooShort = fmt.Errorf("ciphertext too short")
|
||||
ErrInvalidKeySize = errors.New("invalid key size") //nolint:revive
|
||||
ErrCiphertextTooShort = errors.New("ciphertext too short") //nolint:revive
|
||||
)
|
||||
|
||||
type Cipher struct { //nolint:revive
|
||||
|
||||
@@ -1,11 +1,9 @@
|
||||
// ===========================================
|
||||
// AI GENERATED / AI GENERATED / AI GENERATED
|
||||
// ===========================================
|
||||
|
||||
// Package mux provides a multiplexer for multiple streams over a single connection.
|
||||
package mux
|
||||
|
||||
import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
@@ -14,14 +12,14 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
ErrClientResetID = fmt.Errorf("client reset requires a non-zero client id")
|
||||
ErrClientResetID = errors.New("client reset requires a non-zero client id") //nolint:revive
|
||||
)
|
||||
|
||||
const (
|
||||
ControlStreamID uint16 = 0xFFFF //nolint:revive
|
||||
ControlLength uint16 = 0xFFFF //nolint:revive
|
||||
|
||||
ControlResetClient uint32 = 1 //nolint:revive
|
||||
ControlResetClient uint32 = 1
|
||||
)
|
||||
|
||||
type ControlFrame struct { //nolint:revive
|
||||
@@ -127,7 +125,7 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error { //nolint:revive
|
||||
frame := make([]byte, 12+len(chunk))
|
||||
binary.BigEndian.PutUint32(frame[0:4], m.clientID)
|
||||
binary.BigEndian.PutUint16(frame[4:6], sid)
|
||||
binary.BigEndian.PutUint16(frame[6:8], uint16(len(chunk)))
|
||||
binary.BigEndian.PutUint16(frame[6:8], uint16(uint32(len(chunk)))) //nolint:gosec
|
||||
binary.BigEndian.PutUint32(frame[8:12], seq)
|
||||
copy(frame[12:], chunk)
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
// Package protect provides functions to protect sockets from VPN routing.
|
||||
package protect
|
||||
|
||||
import (
|
||||
@@ -19,7 +20,7 @@ func controlFunc(network, _ string, c syscall.RawConn) error {
|
||||
}
|
||||
var err error
|
||||
controlErr := c.Control(func(fd uintptr) {
|
||||
if !Protector(int(fd)) {
|
||||
if !Protector(int(fd)) { //nolint:gosec
|
||||
err = &net.OpError{Op: "protect", Net: network, Err: net.ErrClosed}
|
||||
}
|
||||
})
|
||||
@@ -73,6 +74,6 @@ func (d *proxyDialer) Dial(network, addr string) (net.Conn, error) {
|
||||
}
|
||||
|
||||
// NewProxyDialer returns a proxy.Dialer that protects ICE sockets.
|
||||
func NewProxyDialer() *proxyDialer { //nolint:revive
|
||||
func NewProxyDialer() *proxyDialer {
|
||||
return &proxyDialer{}
|
||||
}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
// Package server provides the core server logic for olcrtc.
|
||||
package server
|
||||
|
||||
import (
|
||||
@@ -5,10 +6,12 @@ import (
|
||||
"crypto/rand"
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -22,10 +25,13 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
ErrKeySize = fmt.Errorf("key must be 32 bytes")
|
||||
ErrKeyStringLength = fmt.Errorf("key string length must be 32")
|
||||
ErrSocks5AuthFailed = fmt.Errorf("SOCKS5 auth failed")
|
||||
ErrSocks5ConnectFailed = fmt.Errorf("SOCKS5 connect failed")
|
||||
ErrKeySize = errors.New("key must be 32 bytes") //nolint:revive
|
||||
ErrKeyStringLength = errors.New("key string length must be 32") //nolint:revive
|
||||
ErrSocks5AuthFailed = errors.New("SOCKS5 auth failed") //nolint:revive
|
||||
ErrSocks5ConnectFailed = errors.New("SOCKS5 connect failed") //nolint:revive
|
||||
ErrNoPeers = errors.New("no peers available") //nolint:revive
|
||||
ErrDialProxy = errors.New("failed to dial proxy") //nolint:revive
|
||||
ErrEncryptFailed = errors.New("encrypt failed") //nolint:revive
|
||||
)
|
||||
|
||||
type Server struct { //nolint:revive
|
||||
@@ -51,7 +57,12 @@ type ConnectRequest struct { //nolint:revive
|
||||
Port int `json:"port"`
|
||||
}
|
||||
|
||||
func Run(ctx context.Context, roomURL, keyHex string, dnsServer, socksProxyAddr string, socksProxyPort int) error { //nolint:revive
|
||||
func Run(
|
||||
ctx context.Context,
|
||||
roomURL, keyHex string,
|
||||
dnsServer, socksProxyAddr string,
|
||||
socksProxyPort int,
|
||||
) error { //nolint:revive
|
||||
runCtx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
@@ -153,12 +164,12 @@ func (s *Server) setupMux() {
|
||||
|
||||
encrypted, err := s.cipher.Encrypt(frame)
|
||||
if err != nil {
|
||||
return fmt.Errorf("encrypt failed: %w", err)
|
||||
return fmt.Errorf("%w: %w", ErrEncryptFailed, err)
|
||||
}
|
||||
if len(s.peers) == 0 {
|
||||
return fmt.Errorf("no peers available")
|
||||
return ErrNoPeers
|
||||
}
|
||||
idx := s.peerIdx.Add(1) % uint32(len(s.peers))
|
||||
idx := s.peerIdx.Add(1) % uint32(len(s.peers)) //nolint:gosec
|
||||
return s.peers[idx].Send(encrypted)
|
||||
})
|
||||
}
|
||||
@@ -217,12 +228,12 @@ func (s *Server) handlePeerReconnect(peerID int, dc *webrtc.DataChannel) {
|
||||
s.mux.UpdateSendFunc(func(frame []byte) error {
|
||||
encrypted, err := s.cipher.Encrypt(frame)
|
||||
if err != nil {
|
||||
return fmt.Errorf("encrypt failed: %w", err)
|
||||
return fmt.Errorf("%w: %w", ErrEncryptFailed, err)
|
||||
}
|
||||
if len(s.peers) == 0 {
|
||||
return fmt.Errorf("no peers available")
|
||||
return ErrNoPeers
|
||||
}
|
||||
idx := s.peerIdx.Add(1) % uint32(len(s.peers))
|
||||
idx := s.peerIdx.Add(1) % uint32(len(s.peers)) //nolint:gosec
|
||||
return s.peers[idx].Send(encrypted)
|
||||
})
|
||||
}
|
||||
@@ -253,7 +264,7 @@ func (s *Server) socks5Connect(conn net.Conn, targetAddr string, targetPort int)
|
||||
req := make([]byte, 0, 7+addrLen)
|
||||
req = append(req, 5, 1, 0, 3, byte(addrLen))
|
||||
req = append(req, []byte(targetAddr)...)
|
||||
req = append(req, byte(targetPort>>8), byte(targetPort))
|
||||
req = append(req, byte(targetPort>>8), byte(targetPort)) //nolint:gosec
|
||||
|
||||
if _, err := conn.Write(req); err != nil {
|
||||
return fmt.Errorf("failed to write socks5 connect req: %w", err)
|
||||
@@ -406,7 +417,7 @@ func (s *Server) unmarkStreamPump(sid uint16, conn net.Conn) {
|
||||
|
||||
func (s *Server) handleConnect(ctx context.Context, sid uint16, req ConnectRequest) {
|
||||
startTime := time.Now()
|
||||
addr := net.JoinHostPort(req.Addr, fmt.Sprintf("%d", req.Port))
|
||||
addr := net.JoinHostPort(req.Addr, strconv.Itoa(req.Port))
|
||||
log.Printf("[SERVER] sid=%d CONNECT_START %s", sid, addr)
|
||||
|
||||
s.closeStreamConnection(sid)
|
||||
@@ -436,24 +447,28 @@ func (s *Server) handleConnect(ctx context.Context, sid uint16, req ConnectReque
|
||||
}
|
||||
|
||||
func (s *Server) dial(req ConnectRequest) (net.Conn, error) {
|
||||
addr := net.JoinHostPort(req.Addr, fmt.Sprintf("%d", req.Port))
|
||||
addr := net.JoinHostPort(req.Addr, strconv.Itoa(req.Port))
|
||||
if s.socksProxyAddr == "" {
|
||||
dialer := &net.Dialer{
|
||||
Timeout: 10 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
Resolver: s.resolver,
|
||||
}
|
||||
return dialer.Dial("tcp4", addr)
|
||||
conn, err := dialer.Dial("tcp4", addr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("dial failed: %w", err)
|
||||
}
|
||||
return conn, nil
|
||||
}
|
||||
|
||||
proxyAddr := net.JoinHostPort(s.socksProxyAddr, fmt.Sprintf("%d", s.socksProxyPort))
|
||||
proxyAddr := net.JoinHostPort(s.socksProxyAddr, strconv.Itoa(s.socksProxyPort))
|
||||
dialer := &net.Dialer{
|
||||
Timeout: 10 * time.Second,
|
||||
KeepAlive: 30 * time.Second,
|
||||
}
|
||||
conn, err := dialer.Dial("tcp4", proxyAddr)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to dial proxy: %w", err)
|
||||
return nil, fmt.Errorf("%w: %w", ErrDialProxy, err)
|
||||
}
|
||||
|
||||
if err := s.socks5Connect(conn, req.Addr, req.Port); err != nil {
|
||||
@@ -493,7 +508,7 @@ func (s *Server) pumpToMux(sid uint16, conn net.Conn) {
|
||||
return
|
||||
}
|
||||
|
||||
totalSent += uint64(n)
|
||||
totalSent += uint64(n) //nolint:gosec
|
||||
if time.Since(lastLog) > 5*time.Second {
|
||||
log.Printf("[SERVER] sid=%d TRANSFER_UP sent=%d MB", sid, totalSent/(1024*1024))
|
||||
lastLog = time.Now()
|
||||
|
||||
@@ -3,6 +3,7 @@ package telemost //nolint:revive
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
@@ -14,7 +15,7 @@ import (
|
||||
|
||||
const apiBase = "https://cloud-api.yandex.ru/telemost_front/v2/telemost"
|
||||
|
||||
var ErrAPI = fmt.Errorf("api error") //nolint:revive
|
||||
var ErrAPI = errors.New("api error") //nolint:revive
|
||||
|
||||
type ConnectionInfo struct { //nolint:revive
|
||||
RoomID string `json:"room_id"` //nolint:tagliatelle
|
||||
|
||||
@@ -1,9 +1,11 @@
|
||||
// Package telemost provides the client for the Yandex Telemost API.
|
||||
package telemost
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"math/rand/v2"
|
||||
@@ -28,19 +30,19 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
ErrDataChannelTimeout = fmt.Errorf("datachannel timeout")
|
||||
ErrDataChannelNotReady = fmt.Errorf("datachannel not ready")
|
||||
ErrSendQueueClosed = fmt.Errorf("send queue closed")
|
||||
ErrSendQueueTimeout = fmt.Errorf("send queue timeout")
|
||||
ErrDataChannelTimeout = errors.New("datachannel timeout") //nolint:revive
|
||||
ErrDataChannelNotReady = errors.New("datachannel not ready") //nolint:revive
|
||||
ErrSendQueueClosed = errors.New("send queue closed") //nolint:revive
|
||||
ErrSendQueueTimeout = errors.New("send queue timeout") //nolint:revive
|
||||
)
|
||||
|
||||
type TrafficShape struct {
|
||||
type TrafficShape struct { //nolint:revive
|
||||
MaxMessageSize int
|
||||
MinDelay time.Duration
|
||||
MaxDelay time.Duration
|
||||
}
|
||||
|
||||
type Peer struct {
|
||||
type Peer struct { //nolint:revive
|
||||
roomURL string
|
||||
name string
|
||||
conn *ConnectionInfo
|
||||
@@ -73,22 +75,22 @@ type Peer struct {
|
||||
wg sync.WaitGroup
|
||||
}
|
||||
|
||||
func (p *Peer) GetSendQueue() chan []byte {
|
||||
func (p *Peer) GetSendQueue() chan []byte { //nolint:revive
|
||||
return p.sendQueue
|
||||
}
|
||||
|
||||
func (p *Peer) GetBufferedAmount() uint64 {
|
||||
func (p *Peer) GetBufferedAmount() uint64 { //nolint:revive
|
||||
if p.dc != nil {
|
||||
return p.dc.BufferedAmount()
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (p *Peer) SetEndedCallback(cb func(string)) {
|
||||
func (p *Peer) SetEndedCallback(cb func(string)) { //nolint:revive
|
||||
p.onEnded = cb
|
||||
}
|
||||
|
||||
func (p *Peer) SetTrafficShape(shape TrafficShape) {
|
||||
func (p *Peer) SetTrafficShape(shape TrafficShape) { //nolint:revive
|
||||
if shape.MaxMessageSize <= 0 {
|
||||
shape.MaxMessageSize = realDataChannelMessageLimit
|
||||
}
|
||||
@@ -98,7 +100,7 @@ func (p *Peer) SetTrafficShape(shape TrafficShape) {
|
||||
p.trafficShape = shape
|
||||
}
|
||||
|
||||
func NewPeer(ctx context.Context, roomURL, name string, onData func([]byte)) (*Peer, error) {
|
||||
func NewPeer(ctx context.Context, roomURL, name string, onData func([]byte)) (*Peer, error) { //nolint:revive
|
||||
conn, err := GetConnectionInfo(ctx, roomURL, name)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -196,7 +198,7 @@ func (p *Peer) Connect(ctx context.Context) error {
|
||||
var err error
|
||||
p.pcSub, err = api.NewPeerConnection(config)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("failed to create sub pc: %w", err)
|
||||
}
|
||||
|
||||
p.pcSub.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
|
||||
@@ -208,7 +210,7 @@ func (p *Peer) Connect(ctx context.Context) error {
|
||||
|
||||
p.pcPub, err = api.NewPeerConnection(config)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("failed to create pub pc: %w", err)
|
||||
}
|
||||
|
||||
p.pcPub.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
|
||||
@@ -220,7 +222,7 @@ func (p *Peer) Connect(ctx context.Context) error {
|
||||
|
||||
p.dc, err = p.pcPub.CreateDataChannel("olcrtc", nil)
|
||||
if err != nil {
|
||||
return err
|
||||
return fmt.Errorf("failed to create dc: %w", err)
|
||||
}
|
||||
|
||||
dcReady := make(chan struct{})
|
||||
@@ -287,16 +289,16 @@ func (p *Peer) Connect(ctx context.Context) error {
|
||||
return fmt.Errorf("failed to dial websocket: %w", err)
|
||||
}
|
||||
if resp != nil && resp.Body != nil {
|
||||
resp.Body.Close()
|
||||
_ = resp.Body.Close()
|
||||
}
|
||||
p.ws = ws
|
||||
|
||||
ws.SetPongHandler(func(string) error {
|
||||
ws.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
_ = ws.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
return nil
|
||||
})
|
||||
|
||||
ws.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
_ = ws.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
|
||||
p.wg.Add(1)
|
||||
go func() {
|
||||
@@ -313,7 +315,7 @@ func (p *Peer) Connect(ctx context.Context) error {
|
||||
p.wg.Add(1)
|
||||
go func() {
|
||||
defer p.wg.Done()
|
||||
p.handleSignaling()
|
||||
p.handleSignaling(ctx)
|
||||
}()
|
||||
|
||||
select {
|
||||
@@ -326,7 +328,7 @@ func (p *Peer) Connect(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Peer) Send(data []byte) error {
|
||||
func (p *Peer) Send(data []byte) error { //nolint:revive
|
||||
if p.dc == nil || p.dc.ReadyState() != webrtc.DataChannelStateOpen {
|
||||
return ErrDataChannelNotReady
|
||||
}
|
||||
@@ -387,10 +389,13 @@ func (p *Peer) sendHello() error {
|
||||
|
||||
p.wsMu.Lock()
|
||||
defer p.wsMu.Unlock()
|
||||
return p.ws.WriteJSON(hello)
|
||||
if err := p.ws.WriteJSON(hello); err != nil {
|
||||
return fmt.Errorf("failed to send hello: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Peer) handleSignaling() {
|
||||
func (p *Peer) handleSignaling(ctx context.Context) {
|
||||
pubSent := false
|
||||
|
||||
for {
|
||||
@@ -405,7 +410,7 @@ func (p *Peer) handleSignaling() {
|
||||
|
||||
p.wsMu.Lock()
|
||||
if p.ws != nil {
|
||||
p.ws.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
_ = p.ws.SetReadDeadline(time.Now().Add(60 * time.Second))
|
||||
}
|
||||
p.wsMu.Unlock()
|
||||
|
||||
@@ -416,7 +421,7 @@ func (p *Peer) handleSignaling() {
|
||||
}
|
||||
|
||||
if serverHello, ok := msg["serverHello"].(map[string]interface{}); ok {
|
||||
p.startTelemetry(serverHello)
|
||||
p.startTelemetry(ctx, serverHello)
|
||||
p.sendAck(uid)
|
||||
}
|
||||
|
||||
@@ -467,7 +472,7 @@ func (p *Peer) handleSignaling() {
|
||||
}
|
||||
|
||||
p.wsMu.Lock()
|
||||
p.ws.WriteJSON(map[string]interface{}{
|
||||
_ = p.ws.WriteJSON(map[string]interface{}{
|
||||
"uid": uuid.New().String(),
|
||||
"subscriberSdpAnswer": map[string]interface{}{
|
||||
"pcSeq": int(pcSeq),
|
||||
@@ -491,7 +496,7 @@ func (p *Peer) handleSignaling() {
|
||||
}
|
||||
|
||||
p.wsMu.Lock()
|
||||
p.ws.WriteJSON(map[string]interface{}{
|
||||
_ = p.ws.WriteJSON(map[string]interface{}{
|
||||
"uid": uuid.New().String(),
|
||||
"publisherSdpOffer": map[string]interface{}{
|
||||
"pcSeq": 1,
|
||||
@@ -554,7 +559,7 @@ func (p *Peer) sendAck(uid string) {
|
||||
p.wsMu.Lock()
|
||||
defer p.wsMu.Unlock()
|
||||
|
||||
p.ws.WriteJSON(map[string]interface{}{
|
||||
_ = p.ws.WriteJSON(map[string]interface{}{
|
||||
"uid": uid,
|
||||
"ack": map[string]interface{}{
|
||||
"status": map[string]interface{}{
|
||||
@@ -615,13 +620,13 @@ func (p *Peer) sendPong(uid string) {
|
||||
p.wsMu.Lock()
|
||||
defer p.wsMu.Unlock()
|
||||
|
||||
p.ws.WriteJSON(map[string]interface{}{
|
||||
_ = p.ws.WriteJSON(map[string]interface{}{
|
||||
"uid": uid,
|
||||
"pong": map[string]interface{}{},
|
||||
})
|
||||
}
|
||||
|
||||
func (p *Peer) startTelemetry(serverHello map[string]interface{}) {
|
||||
func (p *Peer) startTelemetry(ctx context.Context, serverHello map[string]interface{}) {
|
||||
cfg, ok := serverHello["telemetryConfiguration"].(map[string]interface{})
|
||||
if !ok {
|
||||
return
|
||||
@@ -656,16 +661,16 @@ func (p *Peer) startTelemetry(serverHello map[string]interface{}) {
|
||||
ticker := time.NewTicker(interval)
|
||||
defer ticker.Stop()
|
||||
|
||||
p.sendTelemetry(endpoint, "join")
|
||||
p.sendTelemetry(ctx, endpoint, "join")
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
p.sendTelemetry(endpoint, "stats")
|
||||
p.sendTelemetry(ctx, endpoint, "stats")
|
||||
case <-p.telemetryCh:
|
||||
p.sendTelemetry(endpoint, "leave")
|
||||
p.sendTelemetry(ctx, endpoint, "leave")
|
||||
return
|
||||
case <-p.closeCh:
|
||||
p.sendTelemetry(endpoint, "leave")
|
||||
p.sendTelemetry(ctx, endpoint, "leave")
|
||||
return
|
||||
}
|
||||
}
|
||||
@@ -681,7 +686,7 @@ func (p *Peer) stopTelemetry() {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Peer) sendTelemetry(endpoint, event string) {
|
||||
func (p *Peer) sendTelemetry(ctx context.Context, endpoint, event string) {
|
||||
body, err := json.Marshal(map[string]interface{}{
|
||||
"event": event,
|
||||
"timestamp": time.Now().UnixMilli(),
|
||||
@@ -698,7 +703,7 @@ func (p *Peer) sendTelemetry(endpoint, event string) {
|
||||
return
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(context.Background(), http.MethodPost, endpoint, bytes.NewReader(body))
|
||||
req, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, bytes.NewReader(body))
|
||||
if err != nil {
|
||||
logger.Verbosef("Telemetry request skipped: %v", err)
|
||||
return
|
||||
@@ -772,7 +777,7 @@ func (p *Peer) setupICEHandlers() {
|
||||
|
||||
init := c.ToJSON()
|
||||
p.wsMu.Lock()
|
||||
p.ws.WriteJSON(map[string]interface{}{
|
||||
_ = p.ws.WriteJSON(map[string]interface{}{
|
||||
"uid": uuid.New().String(),
|
||||
"webrtcIceCandidate": map[string]interface{}{
|
||||
"candidate": init.Candidate,
|
||||
@@ -792,7 +797,7 @@ func (p *Peer) setupICEHandlers() {
|
||||
|
||||
init := c.ToJSON()
|
||||
p.wsMu.Lock()
|
||||
p.ws.WriteJSON(map[string]interface{}{
|
||||
_ = p.ws.WriteJSON(map[string]interface{}{
|
||||
"uid": uuid.New().String(),
|
||||
"webrtcIceCandidate": map[string]interface{}{
|
||||
"candidate": init.Candidate,
|
||||
@@ -823,13 +828,12 @@ func (p *Peer) sendLeave(uid string) bool {
|
||||
if err := p.ws.WriteJSON(leave); err != nil {
|
||||
log.Printf("Failed to send leave: %v", err)
|
||||
return false
|
||||
} else {
|
||||
log.Println("Sent leave message to server")
|
||||
}
|
||||
log.Println("Sent leave message to server")
|
||||
return true
|
||||
}
|
||||
|
||||
func (p *Peer) Close() error {
|
||||
func (p *Peer) Close() error { //nolint:revive
|
||||
log.Println("Closing peer connection...")
|
||||
|
||||
alreadyClosing := p.closed.Swap(true)
|
||||
@@ -893,7 +897,9 @@ func (p *Peer) Close() error {
|
||||
if p.ws != nil {
|
||||
log.Println("Closing WebSocket...")
|
||||
p.wsMu.Lock()
|
||||
_ = p.ws.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(time.Second))
|
||||
_ = p.ws.WriteControl(websocket.CloseMessage,
|
||||
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""),
|
||||
time.Now().Add(time.Second)) //nolint:lll
|
||||
_ = p.ws.Close()
|
||||
p.wsMu.Unlock()
|
||||
}
|
||||
@@ -968,7 +974,9 @@ func (p *Peer) reconnect(ctx context.Context) error {
|
||||
|
||||
if p.ws != nil {
|
||||
p.wsMu.Lock()
|
||||
_ = p.ws.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(time.Second))
|
||||
_ = p.ws.WriteControl(websocket.CloseMessage,
|
||||
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""),
|
||||
time.Now().Add(time.Second)) //nolint:lll
|
||||
_ = p.ws.Close()
|
||||
p.wsMu.Unlock()
|
||||
}
|
||||
@@ -994,15 +1002,15 @@ func (p *Peer) reconnect(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (p *Peer) SetReconnectCallback(cb func(*webrtc.DataChannel)) {
|
||||
func (p *Peer) SetReconnectCallback(cb func(*webrtc.DataChannel)) { //nolint:revive
|
||||
p.onReconnect = cb
|
||||
}
|
||||
|
||||
func (p *Peer) SetShouldReconnect(fn func() bool) {
|
||||
func (p *Peer) SetShouldReconnect(fn func() bool) { //nolint:revive
|
||||
p.shouldReconnect = fn
|
||||
}
|
||||
|
||||
func (p *Peer) WatchConnection(ctx context.Context) {
|
||||
func (p *Peer) WatchConnection(ctx context.Context) { //nolint:revive
|
||||
const maxReconnects = 10
|
||||
const reconnectWindow = 5 * time.Minute
|
||||
|
||||
@@ -1062,7 +1070,8 @@ func (p *Peer) processSendQueue(workerID int, sessionCloseCh <-chan struct{}) {
|
||||
continue
|
||||
}
|
||||
if p.trafficShape.MaxMessageSize > 0 && len(data) > p.trafficShape.MaxMessageSize {
|
||||
log.Printf("[WORKER-%d] Refusing oversized DataChannel message size=%d limit=%d", workerID, len(data), p.trafficShape.MaxMessageSize)
|
||||
log.Printf("[WORKER-%d] Refusing oversized DataChannel message size=%d limit=%d",
|
||||
workerID, len(data), p.trafficShape.MaxMessageSize) //nolint:lll
|
||||
continue
|
||||
}
|
||||
if delay := p.nextSendDelay(); delay > 0 {
|
||||
@@ -1127,7 +1136,8 @@ func (p *Peer) monitorQueue(sessionCloseCh <-chan struct{}) {
|
||||
buffered = p.dc.BufferedAmount()
|
||||
}
|
||||
if queueLen > 800 || buffered > 3*1024*1024 {
|
||||
log.Printf("[QUEUE_MONITOR] queue_len=%d dc_buffered=%d MB", queueLen, buffered/(1024*1024))
|
||||
log.Printf("[QUEUE_MONITOR] queue_len=%d dc_buffered=%d MB",
|
||||
queueLen, buffered/(1024*1024)) //nolint:lll
|
||||
}
|
||||
case <-sessionCloseCh:
|
||||
return
|
||||
@@ -1137,7 +1147,7 @@ func (p *Peer) monitorQueue(sessionCloseCh <-chan struct{}) {
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Peer) CanSend() bool {
|
||||
func (p *Peer) CanSend() bool { //nolint:revive
|
||||
queueLen := len(p.sendQueue)
|
||||
buffered := uint64(0)
|
||||
if p.dc != nil {
|
||||
@@ -1155,5 +1165,6 @@ func (p *Peer) nextSendDelay() time.Duration {
|
||||
if maxDelay <= minDelay {
|
||||
return maxDelay
|
||||
}
|
||||
//nolint:gosec
|
||||
return minDelay + time.Duration(rand.Int64N(int64(maxDelay-minDelay)))
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user