From 1f6571d1c18ff83f35cc78025967f365dc94d791 Mon Sep 17 00:00:00 2001 From: sbramn Date: Fri, 5 Jun 2026 20:01:45 +0300 Subject: [PATCH] fix(jitsi): keep negotiation while avoiding idle rtcp --- internal/engine/jitsi/jitsi.go | 24 ++++++++++++++++++------ internal/engine/jitsi/jitsi_test.go | 18 ++++++++++++++---- 2 files changed, 32 insertions(+), 10 deletions(-) diff --git a/internal/engine/jitsi/jitsi.go b/internal/engine/jitsi/jitsi.go index 4dc7e8f..56a66bf 100644 --- a/internal/engine/jitsi/jitsi.go +++ b/internal/engine/jitsi/jitsi.go @@ -379,7 +379,7 @@ func (s *Session) completeJingleSetup(ctx context.Context, jSess *j.Session) err } } - if s.shouldNegotiatePC(sctpBridge) { + if s.shouldNegotiatePC(needBridge) { if err := s.negotiatePC(ctx, jSess, sctpBridge); err != nil { return err } @@ -425,8 +425,8 @@ func (s *Session) openBridgeSCTP(ctx context.Context, jSess *j.Session) error { return nil } -func (s *Session) shouldNegotiatePC(sctpBridge bool) bool { - return sctpBridge || s.shouldRequestVideo() +func (s *Session) shouldNegotiatePC(needBridge bool) bool { + return needBridge || s.shouldRequestVideo() } func (s *Session) shouldRequestVideo() bool { @@ -460,7 +460,7 @@ func (s *Session) videoTrackHandler() func(*webrtc.TrackRemote, *webrtc.RTPRecei // belongs to the same logical operation, so splitting it into helpers // would obscure the wire order rather than clarify it. // -//nolint:cyclop // sequential Jingle negotiation steps; refactoring would hide ordering +//nolint:cyclop,gocognit // sequential Jingle negotiation steps; refactoring would hide ordering func (s *Session) negotiatePC(ctx context.Context, jSess *j.Session, sctpBridge bool) error { settings := webrtc.SettingEngine{} settings.LoggerFactory = logger.NewPionLoggerFactory() @@ -613,7 +613,14 @@ func (s *Session) negotiatePC(ctx context.Context, jSess *j.Session, sctpBridge pcCtx := s.pcCtx s.pcMu.Unlock() - // Start an RTCP keepalive. JVB tracks endpoint liveness via + // Start an RTCP keepalive only when the PC carries media or the SCTP bridge + // fallback. colibri-ws byte streams keep the bridge alive separately and do + // not need a 5-second RTCP tick while idle. + if !shouldRunRTCPKeepalive(sctpBridge, requestVideo) { + return nil + } + + // JVB tracks endpoint liveness via // lastIncomingActivityInstant = max(lastRtpReceived, lastIceConsent). // In a TURN-relay-only path, ICE consent updates can fail to reach // JVB's lastIceActivityInstant tracker. Periodic RTCP RR packets @@ -626,6 +633,10 @@ func (s *Session) negotiatePC(ctx context.Context, jSess *j.Session, sctpBridge return nil } +func shouldRunRTCPKeepalive(sctpBridge, requestVideo bool) bool { + return sctpBridge || requestVideo +} + // negotiator is the subset of *peer.Negotiator we need. Defined as an // interface here because peer is in j's internal/ tree and not importable. type negotiator interface { @@ -1172,6 +1183,7 @@ func (s *Session) acceptPeerEpochFrame(from string, payload []byte) ([]byte, boo return payload[off+epochHeaderLen:], true } +//nolint:cyclop // epoch filtering has several explicit drop cases func (s *Session) acceptEpochFrame(payload []byte) ([]byte, bool) { const epochHeaderLen = 8 if len(payload) < len(bridgeMagic)+epochHeaderLen { @@ -1625,7 +1637,7 @@ func (s *Session) teardownPC() { func (s *Session) reinitiateBridge(ctx context.Context, jSess *j.Session) error { needBridge := s.onData != nil || s.onPeerData != nil sctpBridge := needBridge && jSess.ColibriWS == "" - if s.shouldNegotiatePC(sctpBridge) { + if s.shouldNegotiatePC(needBridge) { if err := s.negotiatePC(ctx, jSess, sctpBridge); err != nil { logger.Warnf("jitsi: negotiate after reinitiate failed: %v - full reconnect", err) return s.reconnectFull(ctx) diff --git a/internal/engine/jitsi/jitsi_test.go b/internal/engine/jitsi/jitsi_test.go index 90e6ced..4d1b770 100644 --- a/internal/engine/jitsi/jitsi_test.go +++ b/internal/engine/jitsi/jitsi_test.go @@ -93,7 +93,7 @@ func TestNewSucceeds(t *testing.T) { } } -func TestByteStreamWebSocketSkipsPeerConnectionWithoutRequestingVideo(t *testing.T) { +func TestByteStreamWebSocketNegotiatesPeerConnectionWithoutRTCPKeepalive(t *testing.T) { sess, err := New(context.Background(), engine.Config{ URL: testHost, Extra: map[string]string{credentialKeyRoom: testRoom}, @@ -108,15 +108,18 @@ func TestByteStreamWebSocketSkipsPeerConnectionWithoutRequestingVideo(t *testing if !ok { t.Fatal("sess is not *Session") } - if js.shouldNegotiatePC(false) { - t.Fatal("shouldNegotiatePC(false) = true for websocket bytestream session") + if !js.shouldNegotiatePC(true) { + t.Fatal("shouldNegotiatePC(true) = false for websocket bytestream session") } if js.shouldRequestVideo() { t.Fatal("shouldRequestVideo() = true for bytestream-only session") } + if shouldRunRTCPKeepalive(false, js.shouldRequestVideo()) { + t.Fatal("shouldRunRTCPKeepalive(false, false) = true for websocket bytestream session") + } } -func TestByteStreamSCTPFallbackNegotiatesPeerConnectionWithoutRequestingVideo(t *testing.T) { +func TestByteStreamSCTPFallbackNegotiatesPeerConnectionWithRTCPKeepalive(t *testing.T) { sess, err := New(context.Background(), engine.Config{ URL: testHost, Extra: map[string]string{credentialKeyRoom: testRoom}, @@ -137,6 +140,9 @@ func TestByteStreamSCTPFallbackNegotiatesPeerConnectionWithoutRequestingVideo(t if js.shouldRequestVideo() { t.Fatal("shouldRequestVideo() = true for bytestream-only session") } + if !shouldRunRTCPKeepalive(true, js.shouldRequestVideo()) { + t.Fatal("shouldRunRTCPKeepalive(true, false) = false for SCTP bytestream fallback") + } } func TestVideoSessionNegotiatesPeerConnectionAndRequestsVideo(t *testing.T) { @@ -165,6 +171,9 @@ func TestVideoSessionNegotiatesPeerConnectionAndRequestsVideo(t *testing.T) { if !js.shouldRequestVideo() { t.Fatal("shouldRequestVideo() = false for video session") } + if !shouldRunRTCPKeepalive(false, js.shouldRequestVideo()) { + t.Fatal("shouldRunRTCPKeepalive(false, true) = false for video session") + } } func TestSendBeforeConnect(t *testing.T) { @@ -343,6 +352,7 @@ func TestReconnectEpochAnnounceWithZeroPeerEpochIsAccepted(t *testing.T) { } } +//nolint:cyclop // setup asserts latch, epoch, and delivery state func TestRequireTargetedPeerDropsOtherClientBroadcastBeforeLatch(t *testing.T) { var received [][]byte sess, err := New(context.Background(), engine.Config{