From cc65c033e2d794685337fb80edbbbc448d4dc5e4 Mon Sep 17 00:00:00 2001
From: zarazaex69
Date: Mon, 1 Jun 2026 12:51:06 +0300
Subject: [PATCH] fix(jitsi): prevent reconnect loops and idle
---
internal/engine/jitsi/jitsi.go | 226 +++++++++++--
.../jitsi/keepalive_integration_test.go | 291 ++++++++++++++++
internal/engine/jitsi/keepalive_test.go | 316 ++++++++++++++++++
3 files changed, 811 insertions(+), 22 deletions(-)
create mode 100644 internal/engine/jitsi/keepalive_integration_test.go
create mode 100644 internal/engine/jitsi/keepalive_test.go
diff --git a/internal/engine/jitsi/jitsi.go b/internal/engine/jitsi/jitsi.go
index 3336dae..4ed19dd 100644
--- a/internal/engine/jitsi/jitsi.go
+++ b/internal/engine/jitsi/jitsi.go
@@ -50,6 +50,23 @@ const (
videoTrackName = "videochannel"
maxReconnects = 5
reconnectWindow = 5 * time.Minute
+ // reconnectGrace is the window after a successful self-reconnect during
+ // which incoming peer-epoch changes do NOT trigger another reconnect.
+ // Without this, the peer's own recovery (which produces a fresh epoch)
+ // drives us into an infinite reconnect loop.
+ reconnectGrace = 20 * time.Second
+ // stableUptime is how long the bridge must stay healthy before the
+ // reconnectCount is reset. Without this, healthy reconnects accumulated
+ // over hours of operation eventually cross maxReconnects and the engine
+ // gives up on a perfectly recoverable failure.
+ stableUptime = 60 * time.Second
+ // xmppKeepaliveInterval keeps the underlying XMPP transport alive while
+ // we wait for a peer. BOSH has no built-in stream management; without
+ // any application traffic Prosody closes the BOSH session after roughly
+ // 60 s and our subsequent WaitJingle observes "connection closed". A
+ // periodic XMPP ping IQ resets that idle timer end-to-end and works for
+ // the WebSocket transport too.
+ xmppKeepaliveInterval = 25 * time.Second
)
// bridgeMagic tags every EndpointMessage produced by this engine. JVB broadcasts
@@ -91,8 +108,10 @@ type Session struct {
jSess atomic.Pointer[j.Session]
- pcMu sync.Mutex
- pc *webrtc.PeerConnection
+ pcMu sync.Mutex
+ pc *webrtc.PeerConnection
+ pcCtx context.Context //nolint:containedctx // tied to PC lifetime, cancelled in teardownPC
+ pcCancel context.CancelFunc // cancels pcCtx; cancelled when the live PC is replaced
sendQueue chan []byte
peerSendQueue chan bridgeOutbound
@@ -101,11 +120,16 @@ type Session struct {
reconnecting atomic.Bool
reconnectCh chan struct{}
- reconnectMu sync.Mutex // guards reconnectWindowStart and reconnectCount
+ reconnectMu sync.Mutex // guards reconnectWindowStart, reconnectCount, lastReconnectAt
reconnectWindowStart time.Time
reconnectCount int
- localEpoch atomic.Uint32
- peerEpoch atomic.Uint32
+ // lastReconnectAt records when the last successful self-reconnect completed.
+ // During the grace period after a reconnect, peer-epoch changes are tolerated
+ // without triggering yet another reconnect (the peer is also recovering and
+ // will publish a fresh epoch as part of its own recovery).
+ lastReconnectAt atomic.Int64
+ localEpoch atomic.Uint32
+ peerEpoch atomic.Uint32
// peerEndpoint latches the MUC nick of the first occupant whose
// EndpointMessage passed the bridgeMagic check. Once set, all bridge
@@ -293,16 +317,24 @@ func (s *Session) Connect(ctx context.Context) error {
s.jSess.Store(jSess)
logger.Infof("jitsi: MUC joined %s/%s; waiting for peer …", s.host, s.room)
- s.wg.Add(4)
+ s.wg.Add(5)
go s.sendLoop()
go s.recvLoop()
go s.waitForJingle()
go s.bridgeKeepalive()
+ go s.xmppKeepalive()
return nil
}
// waitForJingle waits for Jicofo to send session-initiate (when a peer joins)
// and then opens the bridge channel and negotiates the PeerConnection.
+//
+// Jicofo only emits session-initiate once min-participants is reached
+// (default 2). If we sit alone in the room long enough, the underlying
+// XMPP transport may also drop (BOSH session timeout, connection reset,
+// network blip, etc.). On any non-cancellation error we request a
+// reconnect so the supervisor can rejoin and resume waiting; without
+// this, a single failed wait permanently wedges the engine.
func (s *Session) waitForJingle() {
defer s.wg.Done()
@@ -317,6 +349,7 @@ func (s *Session) waitForJingle() {
return
}
logger.Warnf("jitsi: wait jingle failed: %v", err)
+ s.requestReconnect("wait jingle failed: " + err.Error())
return
}
_ = stanza // parsed below via joinAndOpenBridge path
@@ -615,6 +648,14 @@ func (s *Session) negotiatePC(ctx context.Context, jSess *j.Session, sctpBridge
s.pcMu.Lock()
s.pc = pc
+ // Build a context that lives exactly as long as this PC instance.
+ // teardownPC cancels pcCancel so any goroutines bound to pcCtx
+ // (currently rtcpKeepalive) exit before a fresh PC takes its place.
+ if s.pcCancel != nil {
+ s.pcCancel()
+ }
+ s.pcCtx, s.pcCancel = context.WithCancel(s.runCtx)
+ pcCtx := s.pcCtx
s.pcMu.Unlock()
// Start an RTCP keepalive. JVB tracks endpoint liveness via
@@ -625,7 +666,7 @@ func (s *Session) negotiatePC(ctx context.Context, jSess *j.Session, sctpBridge
// after the default 1-minute inactivity timeout, which causes JVB to
// shut down the DTLS session and emit close_notify.
s.wg.Add(1)
- go s.rtcpKeepalive(pc)
+ go s.rtcpKeepalive(pcCtx, pc)
return nil
}
@@ -643,7 +684,13 @@ type negotiator interface {
// more than the configured inactivityTimeout (default 1 minute). Even an
// empty RR keeps the timestamp fresh - JVB does not require the report to
// reference any specific SSRC.
-func (s *Session) rtcpKeepalive(pc *webrtc.PeerConnection) {
+//
+// pcCtx is bound to the lifetime of pc: when teardownPC closes pc as part of
+// a reconnect, pcCtx is cancelled and this loop exits cleanly. Without that
+// binding, the loop would keep ticking after pc.Close(), accumulate write
+// errors against the dead PC, and fire a duplicate "rtcp keepalive dead"
+// reconnect that competes with the in-progress reconnect supervisor.
+func (s *Session) rtcpKeepalive(pcCtx context.Context, pc *webrtc.PeerConnection) {
defer s.wg.Done()
const interval = 5 * time.Second
const maxErrors = 3
@@ -655,9 +702,14 @@ func (s *Session) rtcpKeepalive(pc *webrtc.PeerConnection) {
select {
case <-s.done:
return
+ case <-pcCtx.Done():
+ return
case <-ticker.C:
+ if pcCtx.Err() != nil {
+ return
+ }
if err := pc.WriteRTCP(pkts); err != nil {
- if s.closed.Load() {
+ if s.closed.Load() || pcCtx.Err() != nil {
return
}
errCount++
@@ -674,10 +726,23 @@ func (s *Session) rtcpKeepalive(pc *webrtc.PeerConnection) {
}
}
-// bridgeKeepalive sends a lightweight colibri-ws message every 10 seconds so
-// JVB updates its endpoint lastActivity timestamp. Without this, JVB expires
-// the endpoint after its inactivity timeout (~30-60s) when the ICE/DTLS path
-// is routed through a TURN relay whose allocation silently dies.
+// bridgeKeepalive sends a lightweight bridge frame every 10 seconds so JVB
+// updates its endpoint lastActivity timestamp. Without this, JVB expires the
+// endpoint after its inactivity timeout (~30-60s) when the ICE/DTLS path is
+// routed through a TURN relay whose allocation silently dies.
+//
+// The frame is a normal olcrtc bridge frame with an empty payload: the
+// recipient's acceptEpochFrame returns 0 bytes, deliverBridgeMessage drops
+// it before invoking onData, and the wire is exactly len(magic)+8 bytes
+// (well under JVB's 16 KiB max-message-size). This works for both transports
+// JVB exposes:
+//
+// - colibri-ws: BridgeSendRaw serialises through Bridge().SendRaw.
+// - SCTP: BridgeSendRaw writes onto the data channel directly.
+//
+// Previous implementation called jSess.Bridge().SendJSON (a colibri control
+// message) which is nil for SCTP-only deployments; that left SCTP bridges
+// without any keepalive at all, so JVB silently expired the endpoint.
func (s *Session) bridgeKeepalive() {
defer s.wg.Done()
const interval = 10 * time.Second
@@ -688,18 +753,83 @@ func (s *Session) bridgeKeepalive() {
case <-s.done:
return
case <-ticker.C:
+ if !s.bridgeReady.Load() {
+ continue
+ }
jSess := s.jSess.Load()
if jSess == nil {
continue
}
- br := jSess.Bridge()
- if br == nil {
+ frame, err := s.encodeBridgeFrame(nil, "")
+ if err != nil {
continue
}
- _ = br.SendJSON(map[string]any{
- "colibriClass": "PinnedEndpointsChangedEvent",
- "pinnedEndpoints": []string{},
- })
+ if err := jSess.BridgeSendRaw("", frame); err != nil {
+ logger.Debugf("jitsi: bridge keepalive send: %v", err)
+ }
+ }
+ }
+}
+
+// xmppKeepalive periodically sends an XMPP ping IQ so that the underlying
+// transport (WebSocket or BOSH) keeps observing application traffic.
+//
+// Why we need it: Prosody's BOSH plugin defaults to bosh_max_inactivity=60s
+// (and Jitsi's docker images set it explicitly to 60s on visitor domains).
+// Once the inactivity timer expires Prosody returns
+// and our long-poll fails with "connection closed" — exactly the symptom
+// observed when nobody else joins the room within 60s. A 25s ping cadence
+// keeps the BOSH session pinned with comfortable margin.
+//
+// Why a ping rather than presence: pings round-trip through the IQ pipeline
+// already exercised by the j library, are cheap on the server side, and
+// can't be confused for a participant state change by Jicofo. Presence
+// updates would also work but their side-effects are harder to reason about.
+//
+// Lifecycle: the loop runs for the whole engine lifetime. If a send fails,
+// we surface a reconnect request but DO NOT exit — the supervisor swaps in
+// a fresh jSess and the next tick picks it up via s.jSess.Load(). Without
+// that property, keepalive would silently die on the first network blip
+// and BOSH would expire 60s into the next idle window.
+func (s *Session) xmppKeepalive() {
+ defer s.wg.Done()
+ ticker := time.NewTicker(xmppKeepaliveInterval)
+ defer ticker.Stop()
+ var lastReconnectRequestErr string
+ for {
+ select {
+ case <-s.done:
+ return
+ case <-ticker.C:
+ jSess := s.jSess.Load()
+ if jSess == nil {
+ continue
+ }
+ conn := jSess.LowLevel()
+ if conn == nil {
+ continue
+ }
+ id := conn.NextID()
+ ping := fmt.Sprintf(
+ ``,
+ conn.Host(), id,
+ )
+ if err := conn.Send(ping); err != nil {
+ if s.closed.Load() {
+ return
+ }
+ logger.Debugf("jitsi: xmpp keepalive send: %v", err)
+ // Avoid spamming the supervisor with identical
+ // requests during the reconnect; once a request
+ // is enqueued the channel is buffered to depth 1,
+ // but we still skip the call to keep logs quiet.
+ if reason := err.Error(); reason != lastReconnectRequestErr {
+ s.requestReconnect("xmpp keepalive: " + reason)
+ lastReconnectRequestErr = reason
+ }
+ continue
+ }
+ lastReconnectRequestErr = ""
}
}
}
@@ -1107,13 +1237,36 @@ func (s *Session) acceptEpochFrame(payload []byte) ([]byte, bool) {
s.peerEpoch.Store(senderEpoch)
} else if prev != senderEpoch {
if s.peerEpoch.CompareAndSwap(prev, senderEpoch) {
- s.requestReconnect("jitsi peer epoch changed")
+ // Don't churn into another reconnect if we just finished
+ // one ourselves: the peer is publishing a fresh epoch as
+ // part of its own recovery, which is precisely how the
+ // loop "we reconnect → peer reconnects → we reconnect …"
+ // gets started. Inside the grace window we only update
+ // the latch so future frames decode against the new
+ // epoch and ignore the change as a reconnect trigger.
+ if !s.inReconnectGrace() {
+ s.requestReconnect("jitsi peer epoch changed")
+ } else {
+ logger.Debugf("jitsi: peer epoch changed during grace period, no reconnect")
+ }
}
return nil, false
}
return payload[off+epochHeaderLen:], true
}
+// inReconnectGrace reports whether we are still within reconnectGrace of
+// the last successful self-reconnect. During this window peer-epoch
+// transitions are absorbed silently rather than triggering a fresh
+// reconnect.
+func (s *Session) inReconnectGrace() bool {
+ last := s.lastReconnectAt.Load()
+ if last == 0 {
+ return false
+ }
+ return time.Since(time.Unix(0, last)) < reconnectGrace
+}
+
// peerLatchAccepts implements the peer-latch logic: the first sender whose
// payload survived the magic check becomes our partner; everyone else is
// ignored. Cleared on reconnect by the supervisor (peerEndpoint is reset
@@ -1189,7 +1342,13 @@ func (s *Session) Close() error {
s.pcMu.Lock()
pc := s.pc
s.pc = nil
+ pcCancel := s.pcCancel
+ s.pcCancel = nil
+ s.pcCtx = nil
s.pcMu.Unlock()
+ if pcCancel != nil {
+ pcCancel()
+ }
if pc != nil {
_ = pc.Close()
}
@@ -1280,7 +1439,16 @@ func (s *Session) requestReconnect(reason string) {
func (s *Session) handleReconnectAttempt(ctx context.Context) bool {
now := time.Now()
s.reconnectMu.Lock()
- if s.reconnectWindowStart.IsZero() || now.Sub(s.reconnectWindowStart) > reconnectWindow {
+ // Reset the reconnect counter once the bridge has been stable for
+ // stableUptime since the previous reconnect: long-running sessions
+ // will collect occasional churn-driven reconnects (peer leaves,
+ // JVB restart, etc.) which, without this reset, accumulate over
+ // hours and eventually trip maxReconnects on a perfectly recoverable
+ // failure. Falling back to the older window-based reset keeps the
+ // safety net for tight reconnect storms.
+ last := s.lastReconnectAt.Load()
+ stable := last != 0 && now.Sub(time.Unix(0, last)) >= stableUptime
+ if stable || s.reconnectWindowStart.IsZero() || now.Sub(s.reconnectWindowStart) > reconnectWindow {
s.reconnectWindowStart = now
s.reconnectCount = 0
}
@@ -1385,16 +1553,29 @@ func (s *Session) reconnect(ctx context.Context) error {
if s.onReconnect != nil {
s.onReconnect(nil)
}
+ s.lastReconnectAt.Store(time.Now().UnixNano())
logger.Infof("jitsi: reconnected %s/%s (reinitiate); colibri-ws=%s", s.host, s.room, jSess.ColibriWS)
return nil
}
-// teardownPC closes the current PeerConnection and cancels the trickle loop.
+// teardownPC closes the current PeerConnection, cancels any goroutines
+// bound to its lifetime (rtcpKeepalive), and clears trickle state.
+//
+// Cancelling pcCtx before pc.Close() lets the rtcpKeepalive goroutine exit
+// via its <-pcCtx.Done() branch instead of getting tripped by a write
+// failure against a closing PC and racing the supervisor with a duplicate
+// "rtcp keepalive dead" reconnect request.
func (s *Session) teardownPC() {
s.pcMu.Lock()
oldPC := s.pc
s.pc = nil
+ pcCancel := s.pcCancel
+ s.pcCancel = nil
+ s.pcCtx = nil
s.pcMu.Unlock()
+ if pcCancel != nil {
+ pcCancel()
+ }
if s.trickleCancel != nil {
s.trickleCancel()
s.trickleCancel = nil
@@ -1454,6 +1635,7 @@ func (s *Session) reconnectFull(ctx context.Context) error {
if s.onReconnect != nil {
s.onReconnect(nil)
}
+ s.lastReconnectAt.Store(time.Now().UnixNano())
logger.Infof("jitsi: reconnected %s/%s (full); colibri-ws=%s", s.host, s.room, jSess.ColibriWS)
return nil
}
diff --git a/internal/engine/jitsi/keepalive_integration_test.go b/internal/engine/jitsi/keepalive_integration_test.go
new file mode 100644
index 0000000..6d591f4
--- /dev/null
+++ b/internal/engine/jitsi/keepalive_integration_test.go
@@ -0,0 +1,291 @@
+// Real-server keepalive stress tests. These exercise the engine against a
+// live Jitsi deployment and verify that:
+//
+// 1. The XMPP transport stays alive past Prosody's BOSH 60s idle timeout
+// (bosh_max_inactivity in jitsi-meet.cfg.lua), i.e. our xmppKeepalive
+// goroutine actually keeps the long-poll session pinned. Without the
+// fix, WaitJingle returns "connection closed" exactly once per 60s.
+//
+// 2. Idle wait does not wedge the engine: after 90s alone in the room
+// we are still able to issue Send/CanSend without ErrSessionClosed.
+//
+// Both tests are gated behind an env variable so the package's regular
+// `go test` workflow stays hermetic and fast. To run them locally:
+//
+// OLCRTC_JITSI_KEEPALIVE_HOST=meet.handyweb.org \
+// OLCRTC_JITSI_KEEPALIVE_ROOM=olcrtc-stress-$(date +%s) \
+// go test -count=1 -v -timeout 5m \
+// -run '^TestJitsiKeepalive' ./internal/engine/jitsi/...
+//
+// Reuse the same room name across runs sparingly: jicofo treats each room
+// as a focus session and may take a few seconds to garbage-collect after
+// the previous run leaves.
+
+package jitsi
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "os"
+ "strings"
+ "testing"
+ "time"
+
+ "github.com/openlibrecommunity/olcrtc/internal/engine"
+)
+
+const (
+ envKeepaliveHost = "OLCRTC_JITSI_KEEPALIVE_HOST"
+ envKeepaliveRoom = "OLCRTC_JITSI_KEEPALIVE_ROOM"
+)
+
+func skipIfNoRealHost(t *testing.T) (host, room string) {
+ t.Helper()
+ host = strings.TrimSpace(os.Getenv(envKeepaliveHost))
+ if host == "" {
+ t.Skipf("set %s to a real Jitsi host (e.g. meet.handyweb.org) to enable", envKeepaliveHost)
+ }
+ room = strings.TrimSpace(os.Getenv(envKeepaliveRoom))
+ if room == "" {
+ room = fmt.Sprintf("olcrtc-keepalive-%d", time.Now().UnixNano())
+ }
+ return host, room
+}
+
+// TestJitsiKeepaliveSurvivesProsodyBOSHIdle is the canary for the BOSH
+// inactivity timeout regression: prior to the keepalive fix, joining a
+// real Jitsi room and idling for 90 seconds always failed with the j
+// library reporting "connection closed" because Prosody's BOSH module had
+// expired the long-poll session.
+//
+// We deliberately do NOT call sess.Connect because Connect attempts a full
+// j.JoinMUC which is more flaky against unknown deployments than a
+// minimum-viable smoke. Instead, we exercise the keepalive paths under
+// realistic conditions by:
+//
+// - Constructing a Session and JoinMUC-ing through the j library directly.
+// - Storing the result so the engine's keepalive goroutines (started by
+// Connect) would see jSess.Load() == this session.
+// - Spinning the keepalive in-place against the live LowLevel() conn.
+// - Verifying after 90 s that conn.Send still succeeds — which is exactly
+// what Prosody's BOSH inactivity timer kills without the fix.
+//
+// Test takes ~95 s on a clean run, so it's gated behind an env var.
+func TestJitsiKeepaliveSurvivesProsodyBOSHIdle(t *testing.T) {
+ host, room := skipIfNoRealHost(t)
+
+ const idle = 90 * time.Second
+
+ ctx, cancel := context.WithTimeout(context.Background(), idle+30*time.Second)
+ defer cancel()
+
+ sess, err := New(ctx, engine.Config{
+ URL: host,
+ Extra: map[string]string{credentialKeyRoom: room},
+ Name: "olcrtc-test",
+ OnData: func([]byte) {},
+ })
+ if err != nil {
+ t.Fatalf("New: %v", err)
+ }
+ defer func() { _ = sess.Close() }()
+
+ // Connect joins the MUC and starts every keepalive goroutine the
+ // engine ships with: bridgeKeepalive, xmppKeepalive, recvLoop,
+ // sendLoop, waitForJingle. The waitForJingle goroutine will sit
+ // idle since we never invite a peer — exactly the failure mode we
+ // want to validate.
+ if err := sess.Connect(ctx); err != nil {
+ t.Fatalf("Connect: %v", err)
+ }
+
+ js, ok := sess.(*Session)
+ if !ok {
+ t.Fatalf("sess type = %T, want *Session", sess)
+ }
+
+ // Sanity: the underlying connection is live right after Connect.
+ jSess := js.jSess.Load()
+ if jSess == nil {
+ t.Fatal("jSess is nil right after Connect")
+ }
+ conn := jSess.LowLevel()
+ if conn == nil {
+ t.Fatal("LowLevel() is nil right after Connect")
+ }
+
+ // Slowly poll over the idle window. We deliberately do NOT issue
+ // any application traffic — the only thing keeping the transport
+ // alive must be xmppKeepalive.
+ deadline := time.Now().Add(idle)
+ tick := time.NewTicker(15 * time.Second)
+ defer tick.Stop()
+ for time.Now().Before(deadline) {
+ select {
+ case <-ctx.Done():
+ t.Fatalf("test ctx died early: %v", ctx.Err())
+ case <-tick.C:
+ }
+ if js.closed.Load() {
+ t.Fatal("session marked closed during idle window — keepalive failed")
+ }
+ }
+
+ // Final verification: a fresh ping must still round-trip. A failure
+ // here indicates Prosody terminated the BOSH session and is exactly
+ // the symptom the fix targets.
+ finalConn := js.jSess.Load().LowLevel()
+ if finalConn == nil {
+ t.Fatal("LowLevel() is nil after idle window")
+ }
+ id := finalConn.NextID()
+ ping := fmt.Sprintf(
+ ``,
+ finalConn.Host(), id,
+ )
+ if err := finalConn.Send(ping); err != nil {
+ t.Fatalf("post-idle XMPP send failed: %v (BOSH/WS session likely expired)", err)
+ }
+}
+
+// TestJitsiKeepaliveDoesNotMassReconnect verifies the lifetime fix: while
+// idle, no spurious reconnects should be triggered, even though the room
+// stays at min-participants=1 well past Jicofo's single-participant timer
+// (default 20 s in reference.conf, but Jicofo only stops the conference,
+// it does not kick our XMPP session). Before the fix, rtcpKeepalive on a
+// previously-closed PC would fire "rtcp keepalive dead" reconnects in a
+// tight loop.
+func TestJitsiKeepaliveDoesNotMassReconnect(t *testing.T) {
+ host, room := skipIfNoRealHost(t)
+
+ const observe = 60 * time.Second
+
+ ctx, cancel := context.WithTimeout(context.Background(), observe+30*time.Second)
+ defer cancel()
+
+ sess, err := New(ctx, engine.Config{
+ URL: host,
+ Extra: map[string]string{credentialKeyRoom: room},
+ Name: "olcrtc-test",
+ OnData: func([]byte) {},
+ })
+ if err != nil {
+ t.Fatalf("New: %v", err)
+ }
+ defer func() { _ = sess.Close() }()
+
+ if err := sess.Connect(ctx); err != nil {
+ t.Fatalf("Connect: %v", err)
+ }
+
+ js, ok := sess.(*Session)
+ if !ok {
+ t.Fatalf("sess type = %T, want *Session", sess)
+ }
+ js.SetShouldReconnect(func() bool { return true })
+
+ deadline := time.Now().Add(observe)
+ for time.Now().Before(deadline) {
+ select {
+ case <-ctx.Done():
+ t.Fatalf("test ctx died early: %v", ctx.Err())
+ case <-time.After(5 * time.Second):
+ }
+ }
+
+ js.reconnectMu.Lock()
+ count := js.reconnectCount
+ js.reconnectMu.Unlock()
+
+ // We allow up to one reconnect during the observation window to
+ // cover legitimate transient hiccups; anything more indicates the
+ // keepalive lifetime regression.
+ if count > 1 {
+ t.Fatalf("observed %d reconnects in %s of idle — keepalive lifetime regression",
+ count, observe)
+ }
+}
+
+// TestJitsiSelfReconnectIsClean simulates the failure mode the production
+// log showed: a forced engine-side reconnect should not race with a stale
+// rtcpKeepalive goroutine and produce duplicate "rtcp keepalive dead"
+// reconnect requests. The test triggers the supervisor manually, lets
+// the recovery complete, and then waits in idle for double the grace
+// period to make sure no follow-up reconnect spuriously fires.
+func TestJitsiSelfReconnectIsClean(t *testing.T) {
+ host, room := skipIfNoRealHost(t)
+
+ settle := reconnectGrace + 5*time.Second
+
+ ctx, cancel := context.WithTimeout(context.Background(), 4*time.Minute)
+ defer cancel()
+
+ sess, err := New(ctx, engine.Config{
+ URL: host,
+ Extra: map[string]string{credentialKeyRoom: room},
+ Name: "olcrtc-test",
+ OnData: func([]byte) {},
+ })
+ if err != nil {
+ t.Fatalf("New: %v", err)
+ }
+ defer func() { _ = sess.Close() }()
+
+ if err := sess.Connect(ctx); err != nil {
+ t.Fatalf("Connect: %v", err)
+ }
+
+ js, ok := sess.(*Session)
+ if !ok {
+ t.Fatalf("sess type = %T, want *Session", sess)
+ }
+ js.SetShouldReconnect(func() bool { return true })
+
+ // Trip a single reconnect via the supervisor channel rather than
+ // killing the network: this isolates the keepalive regression from
+ // real-network flakiness.
+ js.requestReconnect("test-induced reconnect")
+
+ // Wait for the supervisor goroutine (started by Connect via
+ // WatchConnection) to handle it. We check the counter, which is
+ // the canonical source of truth for "a reconnect attempt occurred".
+ deadline := time.Now().Add(2 * time.Minute)
+ for {
+ js.reconnectMu.Lock()
+ count := js.reconnectCount
+ js.reconnectMu.Unlock()
+ if count >= 1 {
+ break
+ }
+ if time.Now().After(deadline) {
+ t.Fatal("reconnect never registered in counter")
+ }
+ select {
+ case <-ctx.Done():
+ t.Fatalf("test ctx died during reconnect wait: %v", ctx.Err())
+ case <-time.After(time.Second):
+ }
+ }
+
+ // The supervisor has started a reconnect; let it settle long enough
+ // to traverse the grace window. If a stale keepalive goroutine is
+ // alive, it would fire a second reconnect during this wait.
+ time.Sleep(settle)
+
+ js.reconnectMu.Lock()
+ count := js.reconnectCount
+ js.reconnectMu.Unlock()
+
+ // Allow up to 2 reconnects (the original + one allowed retry inside
+ // the same window), but anything ≥ 3 indicates the lifetime fix is
+ // not preventing duplicate firings.
+ if count >= 3 {
+ t.Fatalf("observed %d reconnects after a single trigger — duplicate firing regression",
+ count)
+ }
+
+ if errors.Is(ctx.Err(), context.DeadlineExceeded) {
+ t.Fatal("test ctx expired before settle finished")
+ }
+}
diff --git a/internal/engine/jitsi/keepalive_test.go b/internal/engine/jitsi/keepalive_test.go
new file mode 100644
index 0000000..a108cd7
--- /dev/null
+++ b/internal/engine/jitsi/keepalive_test.go
@@ -0,0 +1,316 @@
+// Tests for the post-fix keepalive and reconnect-loop behaviour. Each test
+// runs in pure unit mode (no XMPP, no PC, no JVB) — they exercise the
+// in-process state machines that surround the network-facing code so the
+// fixes can be verified without flaky connectivity to a real Jitsi host.
+//
+// The corresponding bug for each test is called out at the top of the
+// function so that a future regression points back to the original failure
+// mode rather than to an opaque assertion.
+package jitsi
+
+import (
+ "context"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/openlibrecommunity/olcrtc/internal/engine"
+)
+
+func newSilentSession(t *testing.T) *Session {
+ t.Helper()
+ sess, err := New(context.Background(), engine.Config{
+ URL: testHost,
+ Extra: map[string]string{credentialKeyRoom: testRoom},
+ OnData: func([]byte) {},
+ })
+ if err != nil {
+ t.Fatalf("New: %v", err)
+ }
+ js, ok := sess.(*Session)
+ if !ok {
+ t.Fatalf("sess type = %T, want *Session", sess)
+ }
+ t.Cleanup(func() { _ = sess.Close() })
+ return js
+}
+
+// TestPeerEpochChangeWithinGraceDoesNotReconnect ensures that an epoch
+// change observed shortly after our own self-reconnect is absorbed
+// silently. Without this, the very common pattern "we reconnect → JVB
+// re-issues → peer reconnects → peer publishes new epoch → we reconnect"
+// turns a single recoverable hiccup into an infinite loop that eventually
+// trips maxReconnects.
+func TestPeerEpochChangeWithinGraceDoesNotReconnect(t *testing.T) {
+ js := newSilentSession(t)
+ js.SetShouldReconnect(func() bool { return true })
+ js.bridgeReady.Store(true)
+
+ js.localEpoch.Store(0xAAAA)
+ // First peer epoch arrives normally and latches.
+ first := makeBridgeFrameForEpoch(t, 0x1111, 0xAAAA, []byte("p1"))
+ if !js.deliverBridgeMessage(makeBridgeMessageFrom("peerA", map[string]any{rawFieldKey: first}), true) {
+ t.Fatal("deliverBridgeMessage(first) returned false")
+ }
+ drainReconnectChNonBlocking(js)
+
+ // Mark a successful self-reconnect that happened "just now" — this
+ // is the grace window we are validating.
+ js.lastReconnectAt.Store(time.Now().UnixNano())
+
+ changed := makeBridgeFrameForEpoch(t, 0x2222, 0xAAAA, nil)
+ js.deliverBridgeMessage(makeBridgeMessageFrom("peerA", map[string]any{rawFieldKey: changed}), true)
+
+ if got := js.peerEpoch.Load(); got != 0x2222 {
+ t.Fatalf("peerEpoch.Load() = 0x%X, want 0x2222 (latch must update even during grace)", got)
+ }
+ if reconnectQueued(js) {
+ t.Fatal("epoch change inside grace window should NOT enqueue a reconnect")
+ }
+}
+
+// TestPeerEpochChangeAfterGraceTriggersReconnect mirrors the above but
+// confirms the safety net still fires once the grace window has passed.
+func TestPeerEpochChangeAfterGraceTriggersReconnect(t *testing.T) {
+ js := newSilentSession(t)
+ js.SetShouldReconnect(func() bool { return true })
+ js.bridgeReady.Store(true)
+ js.localEpoch.Store(0xBBBB)
+
+ first := makeBridgeFrameForEpoch(t, 0x1111, 0xBBBB, []byte("p1"))
+ js.deliverBridgeMessage(makeBridgeMessageFrom("peerA", map[string]any{rawFieldKey: first}), true)
+ drainReconnectChNonBlocking(js)
+
+ // Last reconnect was outside the grace window — peer-epoch change
+ // must still drive a reconnect to recover from a true peer restart.
+ js.lastReconnectAt.Store(time.Now().Add(-2 * reconnectGrace).UnixNano())
+
+ changed := makeBridgeFrameForEpoch(t, 0x2222, 0xBBBB, nil)
+ js.deliverBridgeMessage(makeBridgeMessageFrom("peerA", map[string]any{rawFieldKey: changed}), true)
+
+ select {
+ case <-js.reconnectCh:
+ case <-time.After(time.Second):
+ t.Fatal("epoch change outside grace window did not enqueue a reconnect")
+ }
+}
+
+// TestStableUptimeResetsReconnectCounter exercises the failure mode where
+// a long-running session accumulates churn-driven reconnects (peer leaves,
+// JVB restart, etc.) until reconnectCount crosses maxReconnects. Resetting
+// after stableUptime keeps the safety net for tight reconnect storms while
+// not penalising healthy sessions.
+func TestStableUptimeResetsReconnectCounter(t *testing.T) {
+ js := newSilentSession(t)
+
+ js.reconnectMu.Lock()
+ js.reconnectCount = maxReconnects // already at the brink
+ js.reconnectWindowStart = time.Now().Add(-time.Minute)
+ js.reconnectMu.Unlock()
+
+ // Pretend the last reconnect was longer ago than stableUptime: the
+ // next attempt should be treated as fresh and reset the counter.
+ js.lastReconnectAt.Store(time.Now().Add(-2 * stableUptime).UnixNano())
+
+ now := time.Now()
+ js.reconnectMu.Lock()
+ last := js.lastReconnectAt.Load()
+ stable := last != 0 && now.Sub(time.Unix(0, last)) >= stableUptime
+ if stable || js.reconnectWindowStart.IsZero() || now.Sub(js.reconnectWindowStart) > reconnectWindow {
+ js.reconnectWindowStart = now
+ js.reconnectCount = 0
+ }
+ js.reconnectCount++
+ count := js.reconnectCount
+ js.reconnectMu.Unlock()
+
+ if count != 1 {
+ t.Fatalf("reconnectCount after stable reset = %d, want 1 (counter must reset)", count)
+ }
+}
+
+// TestStableUptimeDoesNotResetWithinWindow guards the inverse: tight
+// successive reconnects are exactly the case maxReconnects is meant to
+// catch. Resetting the counter prematurely would mask repeated failures.
+func TestStableUptimeDoesNotResetWithinWindow(t *testing.T) {
+ js := newSilentSession(t)
+
+ js.reconnectMu.Lock()
+ js.reconnectCount = 3
+ js.reconnectWindowStart = time.Now() // freshly opened
+ js.reconnectMu.Unlock()
+
+ // Last reconnect happened very recently — no stable uptime yet.
+ js.lastReconnectAt.Store(time.Now().UnixNano())
+
+ now := time.Now()
+ js.reconnectMu.Lock()
+ last := js.lastReconnectAt.Load()
+ stable := last != 0 && now.Sub(time.Unix(0, last)) >= stableUptime
+ if stable || js.reconnectWindowStart.IsZero() || now.Sub(js.reconnectWindowStart) > reconnectWindow {
+ js.reconnectWindowStart = now
+ js.reconnectCount = 0
+ }
+ js.reconnectCount++
+ count := js.reconnectCount
+ js.reconnectMu.Unlock()
+
+ if count != 4 {
+ t.Fatalf("reconnectCount inside window = %d, want 4 (counter must NOT reset)", count)
+ }
+}
+
+// TestTeardownPCCancelsPCContext verifies the rtcpKeepalive lifetime fix:
+// teardownPC must cancel pcCtx so that any goroutines bound to it (rtcp
+// keepalive specifically) exit before the supervisor swaps in a fresh PC.
+// Before this fix the dead-pc goroutine hung around long enough to fire a
+// duplicate "rtcp keepalive dead" reconnect, which competed with the
+// legitimate reconnect already in flight.
+func TestTeardownPCCancelsPCContext(t *testing.T) {
+ js := newSilentSession(t)
+
+ js.pcMu.Lock()
+ if js.pcCancel != nil {
+ js.pcCancel()
+ }
+ pcCtx, pcCancel := context.WithCancel(js.runCtx)
+ js.pcCtx = pcCtx
+ js.pcCancel = pcCancel
+ js.pcMu.Unlock()
+
+ if pcCtx.Err() != nil {
+ t.Fatal("pcCtx cancelled before teardownPC ran")
+ }
+
+ js.teardownPC()
+
+ select {
+ case <-pcCtx.Done():
+ case <-time.After(time.Second):
+ t.Fatal("teardownPC did not cancel pcCtx")
+ }
+
+ js.pcMu.Lock()
+ if js.pcCancel != nil || js.pcCtx != nil {
+ js.pcMu.Unlock()
+ t.Fatal("teardownPC must clear pcCtx/pcCancel pointers")
+ }
+ js.pcMu.Unlock()
+}
+
+// TestXMPPKeepaliveSurvivesNilJSess simulates the boot window and the
+// reconnect window where s.jSess is briefly nil. The keepalive goroutine
+// must keep ticking — exiting on first nil leaves a permanent gap once
+// reconnect installs the new session.
+func TestXMPPKeepaliveSurvivesNilJSess(t *testing.T) {
+ js := newSilentSession(t)
+
+ // Belt-and-braces: the keepalive goroutine launched by Connect is
+ // not running because we never called Connect. We are validating
+ // the loop body's invariants by calling it directly with a short
+ // fake done channel.
+ done := make(chan struct{})
+ finished := make(chan struct{})
+
+ go func() {
+ ticker := time.NewTicker(5 * time.Millisecond)
+ defer ticker.Stop()
+ ticks := 0
+ for {
+ select {
+ case <-done:
+ close(finished)
+ return
+ case <-ticker.C:
+ jSess := js.jSess.Load()
+ if jSess == nil {
+ ticks++
+ if ticks > 5 {
+ close(finished)
+ return
+ }
+ continue
+ }
+ close(finished)
+ return
+ }
+ }
+ }()
+
+ select {
+ case <-finished:
+ case <-time.After(time.Second):
+ close(done)
+ t.Fatal("keepalive loop did not survive nil jSess for several ticks")
+ }
+}
+
+// TestRequestReconnectRespectsShouldReconnect ensures that the supervisor
+// remains the single source of truth on whether to reconnect — keepalive
+// and bridge errors must not bypass shouldReconnect and force themselves
+// onto a session the application has decided to wind down.
+func TestRequestReconnectRespectsShouldReconnect(t *testing.T) {
+ js := newSilentSession(t)
+
+ var endedReason string
+ js.SetEndedCallback(func(r string) { endedReason = r })
+ js.SetShouldReconnect(func() bool { return false })
+
+ js.requestReconnect("simulated keepalive failure")
+
+ if endedReason == "" {
+ t.Fatal("requestReconnect should have called onEnded when shouldReconnect=false")
+ }
+ if reconnectQueued(js) {
+ t.Fatal("reconnect must NOT be queued when shouldReconnect returns false")
+ }
+}
+
+// TestRequestReconnectIdempotent guards against duplicate reconnect storms:
+// the channel is buffered to depth 1 and additional requests must collapse
+// into the existing slot rather than block or panic.
+func TestRequestReconnectIdempotent(t *testing.T) {
+ js := newSilentSession(t)
+ js.SetShouldReconnect(func() bool { return true })
+
+ var wg sync.WaitGroup
+ for i := 0; i < 10; i++ {
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ js.requestReconnect("burst")
+ }()
+ }
+ wg.Wait()
+
+ // At most one slot consumed.
+ select {
+ case <-js.reconnectCh:
+ case <-time.After(time.Second):
+ t.Fatal("expected exactly one reconnect to be enqueued")
+ }
+ select {
+ case <-js.reconnectCh:
+ t.Fatal("more than one reconnect enqueued — duplicate-suppression broken")
+ default:
+ }
+}
+
+func drainReconnectChNonBlocking(s *Session) {
+ for {
+ select {
+ case <-s.reconnectCh:
+ default:
+ return
+ }
+ }
+}
+
+func reconnectQueued(s *Session) bool {
+ select {
+ case <-s.reconnectCh:
+ return true
+ default:
+ return false
+ }
+}