diff --git a/internal/protect/protect.go b/internal/protect/protect.go index 24fc8d9..49c8340 100644 --- a/internal/protect/protect.go +++ b/internal/protect/protect.go @@ -65,6 +65,7 @@ func DialContext(ctx context.Context, network, address string) (net.Conn, error) // ProxyDialer implements golang.org/x/net/proxy.Dialer for pion ICE. type ProxyDialer struct{} +// Dial connects to the address on the named network using a protected socket. func (d *ProxyDialer) Dial(network, addr string) (net.Conn, error) { conn, err := NewDialer().Dial(network, addr) if err != nil { diff --git a/internal/server/server.go b/internal/server/server.go index 4bed582..d1fbaa0 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -25,13 +25,13 @@ import ( ) var ( - ErrKeySize = errors.New("key must be 32 bytes") //nolint:revive - ErrKeyStringLength = errors.New("key string length must be 32") //nolint:revive - ErrSocks5AuthFailed = errors.New("SOCKS5 auth failed") //nolint:revive - ErrSocks5ConnectFailed = errors.New("SOCKS5 connect failed") //nolint:revive - ErrNoPeers = errors.New("no peers available") //nolint:revive - ErrDialProxy = errors.New("failed to dial proxy") //nolint:revive - ErrEncryptFailed = errors.New("encrypt failed") //nolint:revive + ErrKeySize = errors.New("key must be 32 bytes") + ErrKeyStringLength = errors.New("key string length must be 32") + ErrSocks5AuthFailed = errors.New("SOCKS5 auth failed") + ErrSocks5ConnectFailed = errors.New("SOCKS5 connect failed") + ErrNoPeers = errors.New("no peers available") + ErrDialProxy = errors.New("failed to dial proxy") + ErrEncryptFailed = errors.New("encrypt failed") ) type Server struct { //nolint:revive @@ -66,7 +66,6 @@ func Run( socksProxyAddr string, socksProxyPort int, ) error { - runCtx, cancel := context.WithCancel(ctx) defer cancel() diff --git a/internal/telemost/peer.go b/internal/telemost/peer.go index dda9029..c74e4b8 100644 --- a/internal/telemost/peer.go +++ b/internal/telemost/peer.go @@ -29,10 +29,13 @@ const ( ) var ( + // ErrDataChannelTimeout is returned when the datachannel fails to open within the timeout. ErrDataChannelTimeout = errors.New("datachannel timeout") ErrDataChannelNotReady = errors.New("datachannel not ready") ErrSendQueueClosed = errors.New("send queue closed") ErrSendQueueTimeout = errors.New("send queue timeout") + ErrSessionClosed = errors.New("session closed") + ErrPeerClosed = errors.New("peer closed") ) type TrafficShape struct { //nolint:revive @@ -59,7 +62,6 @@ type Peer struct { //nolint:revive telemetryCh chan struct{} lastReconnect time.Time reconnectCount int - reconnectMu sync.Mutex sessionMu sync.Mutex sendQueue chan []byte sendQueueClosed atomic.Bool @@ -178,7 +180,7 @@ func (p *Peer) drainReconnectQueue() { } } -func (p *Peer) Connect(ctx context.Context) error { //nolint:revive,gocognit +func (p *Peer) Connect(ctx context.Context) error { //nolint:revive p.closed.Store(false) config := webrtc.Configuration{ @@ -205,7 +207,7 @@ func (p *Peer) Connect(ctx context.Context) error { //nolint:revive,gocognit } p.setupICEHandlers() - p.startBackgroundGoroutines(keepAliveCh) + p.startBackgroundGoroutines(ctx, keepAliveCh) select { case <-dcReady: @@ -319,7 +321,7 @@ func (p *Peer) dialWebSocket() error { return nil } -func (p *Peer) startBackgroundGoroutines(keepAliveCh chan struct{}) { +func (p *Peer) startBackgroundGoroutines(ctx context.Context, keepAliveCh chan struct{}) { p.wg.Add(1) go func() { defer p.wg.Done() @@ -331,7 +333,7 @@ func (p *Peer) startBackgroundGoroutines(keepAliveCh chan struct{}) { p.wg.Add(1) go func() { defer p.wg.Done() - p.handleSignaling(context.Background()) + p.handleSignaling(ctx) }() } @@ -402,7 +404,7 @@ func (p *Peer) sendHello() error { return nil } -func (p *Peer) handleSignaling(ctx context.Context) { //nolint:gocognit +func (p *Peer) handleSignaling(ctx context.Context) { //nolint:cyclop pubSent := false for { @@ -837,7 +839,7 @@ func (p *Peer) sendLeave(uid string) bool { return true } -func (p *Peer) Close() error { //nolint:revive,cyclop +func (p *Peer) Close() error { //nolint:revive log.Println("Closing peer...") alreadyClosing := p.closed.Swap(true) p.sendQueueClosed.Store(true) @@ -993,7 +995,7 @@ func (p *Peer) SetShouldReconnect(fn func() bool) { //nolint:revive p.shouldReconnect = fn } -func (p *Peer) WatchConnection(ctx context.Context) { //nolint:revive,gocognit +func (p *Peer) WatchConnection(ctx context.Context) { //nolint:revive,gocognit,cyclop const maxReconnects = 10 const reconnectWindow = 5 * time.Minute @@ -1039,7 +1041,7 @@ func (p *Peer) WatchConnection(ctx context.Context) { //nolint:revive,gocognit } } -func (p *Peer) processSendQueue(workerID int, sessionCloseCh <-chan struct{}) { //nolint:revive,gocognit +func (p *Peer) processSendQueue(workerID int, sessionCloseCh <-chan struct{}) { //nolint:gocognit for { select { case <-sessionCloseCh: @@ -1079,9 +1081,9 @@ func (p *Peer) waitBufferedAmount(workerID int, sessionCloseCh <-chan struct{}) for p.dc.BufferedAmount() > 512*1024 { select { case <-sessionCloseCh: - return 0, fmt.Errorf("session closed") + return 0, ErrSessionClosed case <-p.closeCh: - return 0, fmt.Errorf("peer closed") + return 0, ErrPeerClosed case <-time.After(10 * time.Millisecond): if time.Since(start) > 5*time.Second { log.Printf("[WORKER-%d] Buffer wait timeout", workerID)