From efadd37df4a1ac1f1ec18c5a0d198f17aa733644 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Sat, 30 May 2026 12:16:20 +0300 Subject: [PATCH] fix: golangci --- internal/engine/jitsi/jitsi.go | 74 ++++++++++++++++++++-------------- internal/server/server.go | 22 ++++++---- 2 files changed, 58 insertions(+), 38 deletions(-) diff --git a/internal/engine/jitsi/jitsi.go b/internal/engine/jitsi/jitsi.go index 51485d4..a42cf65 100644 --- a/internal/engine/jitsi/jitsi.go +++ b/internal/engine/jitsi/jitsi.go @@ -509,7 +509,7 @@ func (s *Session) negotiatePC(ctx context.Context, jSess *j.Session, sctpBridge // closes that race window - any source-add Jicofo emits is picked up // the instant it lands on the wire. s.wg.Add(1) - trickleCtx, trickleCancel := context.WithCancel(context.Background()) + trickleCtx, trickleCancel := context.WithCancel(ctx) s.trickleCancel = trickleCancel go s.trickleDrainLoop(trickleCtx, pc, neg, jSess.LowLevel().Stanzas()) @@ -599,7 +599,9 @@ func (s *Session) rtcpKeepalive(pc *webrtc.PeerConnection) { // Incoming source-add stanzas (announcing other participants' SSRCs) are // merged into the remote SDP via neg.HandleSourceAdd so pion can route the // inbound RTP through OnTrack. -func (s *Session) trickleDrainLoop(ctx context.Context, pc *webrtc.PeerConnection, neg negotiator, stanzas <-chan string) { +func (s *Session) trickleDrainLoop( + ctx context.Context, pc *webrtc.PeerConnection, neg negotiator, stanzas <-chan string, +) { defer s.wg.Done() for { select { @@ -1209,19 +1211,7 @@ func (s *Session) reconnect(ctx context.Context) error { defer s.reconnecting.Store(false) s.bridgeReady.Store(false) - - // Close PC only - keep the XMPP session alive. - s.pcMu.Lock() - oldPC := s.pc - s.pc = nil - s.pcMu.Unlock() - if s.trickleCancel != nil { - s.trickleCancel() - s.trickleCancel = nil - } - if oldPC != nil { - _ = oldPC.Close() - } + s.teardownPC() s.localEpoch.Store(randomEpoch()) s.peerEpoch.Store(0) @@ -1249,22 +1239,8 @@ func (s *Session) reconnect(ctx context.Context) error { return s.reconnectFull(ctx) } - // Got session-initiate - negotiate PC and open bridge. - sctpBridge := jSess.ColibriWS == "" - if err := s.negotiatePC(ctx, jSess, sctpBridge); err != nil { - logger.Warnf("jitsi: negotiate after reinitiate failed: %v - full reconnect", err) - return s.reconnectFull(ctx) - } - if sctpBridge { - if err := s.openBridgeSCTP(ctx, jSess); err != nil { - logger.Warnf("jitsi: bridge after reinitiate failed: %v - full reconnect", err) - return s.reconnectFull(ctx) - } - } else { - if err := s.openBridgeWS(ctx, jSess); err != nil { - logger.Warnf("jitsi: bridge after reinitiate failed: %v - full reconnect", err) - return s.reconnectFull(ctx) - } + if err := s.reinitiateBridge(ctx, jSess); err != nil { + return err } s.peerEndpoint.Store(nil) @@ -1284,6 +1260,42 @@ func (s *Session) reconnect(ctx context.Context) error { return nil } +// teardownPC closes the current PeerConnection and cancels the trickle loop. +func (s *Session) teardownPC() { + s.pcMu.Lock() + oldPC := s.pc + s.pc = nil + s.pcMu.Unlock() + if s.trickleCancel != nil { + s.trickleCancel() + s.trickleCancel = nil + } + if oldPC != nil { + _ = oldPC.Close() + } +} + +// reinitiateBridge negotiates a new PeerConnection and opens the bridge channel. +func (s *Session) reinitiateBridge(ctx context.Context, jSess *j.Session) error { + sctpBridge := jSess.ColibriWS == "" + if err := s.negotiatePC(ctx, jSess, sctpBridge); err != nil { + logger.Warnf("jitsi: negotiate after reinitiate failed: %v - full reconnect", err) + return s.reconnectFull(ctx) + } + if sctpBridge { + if err := s.openBridgeSCTP(ctx, jSess); err != nil { + logger.Warnf("jitsi: bridge after reinitiate failed: %v - full reconnect", err) + return s.reconnectFull(ctx) + } + } else { + if err := s.openBridgeWS(ctx, jSess); err != nil { + logger.Warnf("jitsi: bridge after reinitiate failed: %v - full reconnect", err) + return s.reconnectFull(ctx) + } + } + return nil +} + // reconnectFull tears down everything and does a full rejoin (blocking on session-initiate). func (s *Session) reconnectFull(ctx context.Context) error { if old := s.jSess.Swap(nil); old != nil { diff --git a/internal/server/server.go b/internal/server/server.go index 5c07cad..2d78d09 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -523,15 +523,9 @@ func (s *Server) serveSingle(ctx context.Context) { stream, err := sess.AcceptStream() if err != nil { - if contextDone(ctx) { + if s.handleAcceptError(ctx, sess) { return } - hadSession := s.handshakeReady() - logger.Debugf("AcceptStream returned %v - reinstalling session", err) - s.reinstallSession(sess) - if hadSession && s.ln != nil { - s.ln.Reconnect("liveness") - } continue } @@ -543,6 +537,20 @@ func (s *Server) serveSingle(ctx context.Context) { } } +// handleAcceptError handles a failed AcceptStream. Returns true if the server should stop. +func (s *Server) handleAcceptError(ctx context.Context, sess *smux.Session) bool { + if contextDone(ctx) { + return true + } + hadSession := s.handshakeReady() + logger.Debugf("AcceptStream returned error - reinstalling session") + s.reinstallSession(sess) + if hadSession && s.ln != nil { + s.ln.Reconnect("liveness") + } + return false +} + func (s *Server) currentSessionID() string { s.sessMu.RLock() defer s.sessMu.RUnlock()