From 1d0a1467e4ad8f54ef774e4fc4b5fd49d9581718 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 1 Jun 2026 13:14:34 +0300 Subject: [PATCH] test(jitsi): replace keepalive tests with paired chaos --- .../jitsi/keepalive_integration_test.go | 291 ----------- internal/engine/jitsi/paired_chaos_test.go | 476 ++++++++++++++++++ 2 files changed, 476 insertions(+), 291 deletions(-) delete mode 100644 internal/engine/jitsi/keepalive_integration_test.go create mode 100644 internal/engine/jitsi/paired_chaos_test.go diff --git a/internal/engine/jitsi/keepalive_integration_test.go b/internal/engine/jitsi/keepalive_integration_test.go deleted file mode 100644 index 6d591f4..0000000 --- a/internal/engine/jitsi/keepalive_integration_test.go +++ /dev/null @@ -1,291 +0,0 @@ -// Real-server keepalive stress tests. These exercise the engine against a -// live Jitsi deployment and verify that: -// -// 1. The XMPP transport stays alive past Prosody's BOSH 60s idle timeout -// (bosh_max_inactivity in jitsi-meet.cfg.lua), i.e. our xmppKeepalive -// goroutine actually keeps the long-poll session pinned. Without the -// fix, WaitJingle returns "connection closed" exactly once per 60s. -// -// 2. Idle wait does not wedge the engine: after 90s alone in the room -// we are still able to issue Send/CanSend without ErrSessionClosed. -// -// Both tests are gated behind an env variable so the package's regular -// `go test` workflow stays hermetic and fast. To run them locally: -// -// OLCRTC_JITSI_KEEPALIVE_HOST=meet.handyweb.org \ -// OLCRTC_JITSI_KEEPALIVE_ROOM=olcrtc-stress-$(date +%s) \ -// go test -count=1 -v -timeout 5m \ -// -run '^TestJitsiKeepalive' ./internal/engine/jitsi/... -// -// Reuse the same room name across runs sparingly: jicofo treats each room -// as a focus session and may take a few seconds to garbage-collect after -// the previous run leaves. - -package jitsi - -import ( - "context" - "errors" - "fmt" - "os" - "strings" - "testing" - "time" - - "github.com/openlibrecommunity/olcrtc/internal/engine" -) - -const ( - envKeepaliveHost = "OLCRTC_JITSI_KEEPALIVE_HOST" - envKeepaliveRoom = "OLCRTC_JITSI_KEEPALIVE_ROOM" -) - -func skipIfNoRealHost(t *testing.T) (host, room string) { - t.Helper() - host = strings.TrimSpace(os.Getenv(envKeepaliveHost)) - if host == "" { - t.Skipf("set %s to a real Jitsi host (e.g. meet.handyweb.org) to enable", envKeepaliveHost) - } - room = strings.TrimSpace(os.Getenv(envKeepaliveRoom)) - if room == "" { - room = fmt.Sprintf("olcrtc-keepalive-%d", time.Now().UnixNano()) - } - return host, room -} - -// TestJitsiKeepaliveSurvivesProsodyBOSHIdle is the canary for the BOSH -// inactivity timeout regression: prior to the keepalive fix, joining a -// real Jitsi room and idling for 90 seconds always failed with the j -// library reporting "connection closed" because Prosody's BOSH module had -// expired the long-poll session. -// -// We deliberately do NOT call sess.Connect because Connect attempts a full -// j.JoinMUC which is more flaky against unknown deployments than a -// minimum-viable smoke. Instead, we exercise the keepalive paths under -// realistic conditions by: -// -// - Constructing a Session and JoinMUC-ing through the j library directly. -// - Storing the result so the engine's keepalive goroutines (started by -// Connect) would see jSess.Load() == this session. -// - Spinning the keepalive in-place against the live LowLevel() conn. -// - Verifying after 90 s that conn.Send still succeeds — which is exactly -// what Prosody's BOSH inactivity timer kills without the fix. -// -// Test takes ~95 s on a clean run, so it's gated behind an env var. -func TestJitsiKeepaliveSurvivesProsodyBOSHIdle(t *testing.T) { - host, room := skipIfNoRealHost(t) - - const idle = 90 * time.Second - - ctx, cancel := context.WithTimeout(context.Background(), idle+30*time.Second) - defer cancel() - - sess, err := New(ctx, engine.Config{ - URL: host, - Extra: map[string]string{credentialKeyRoom: room}, - Name: "olcrtc-test", - OnData: func([]byte) {}, - }) - if err != nil { - t.Fatalf("New: %v", err) - } - defer func() { _ = sess.Close() }() - - // Connect joins the MUC and starts every keepalive goroutine the - // engine ships with: bridgeKeepalive, xmppKeepalive, recvLoop, - // sendLoop, waitForJingle. The waitForJingle goroutine will sit - // idle since we never invite a peer — exactly the failure mode we - // want to validate. - if err := sess.Connect(ctx); err != nil { - t.Fatalf("Connect: %v", err) - } - - js, ok := sess.(*Session) - if !ok { - t.Fatalf("sess type = %T, want *Session", sess) - } - - // Sanity: the underlying connection is live right after Connect. - jSess := js.jSess.Load() - if jSess == nil { - t.Fatal("jSess is nil right after Connect") - } - conn := jSess.LowLevel() - if conn == nil { - t.Fatal("LowLevel() is nil right after Connect") - } - - // Slowly poll over the idle window. We deliberately do NOT issue - // any application traffic — the only thing keeping the transport - // alive must be xmppKeepalive. - deadline := time.Now().Add(idle) - tick := time.NewTicker(15 * time.Second) - defer tick.Stop() - for time.Now().Before(deadline) { - select { - case <-ctx.Done(): - t.Fatalf("test ctx died early: %v", ctx.Err()) - case <-tick.C: - } - if js.closed.Load() { - t.Fatal("session marked closed during idle window — keepalive failed") - } - } - - // Final verification: a fresh ping must still round-trip. A failure - // here indicates Prosody terminated the BOSH session and is exactly - // the symptom the fix targets. - finalConn := js.jSess.Load().LowLevel() - if finalConn == nil { - t.Fatal("LowLevel() is nil after idle window") - } - id := finalConn.NextID() - ping := fmt.Sprintf( - ``, - finalConn.Host(), id, - ) - if err := finalConn.Send(ping); err != nil { - t.Fatalf("post-idle XMPP send failed: %v (BOSH/WS session likely expired)", err) - } -} - -// TestJitsiKeepaliveDoesNotMassReconnect verifies the lifetime fix: while -// idle, no spurious reconnects should be triggered, even though the room -// stays at min-participants=1 well past Jicofo's single-participant timer -// (default 20 s in reference.conf, but Jicofo only stops the conference, -// it does not kick our XMPP session). Before the fix, rtcpKeepalive on a -// previously-closed PC would fire "rtcp keepalive dead" reconnects in a -// tight loop. -func TestJitsiKeepaliveDoesNotMassReconnect(t *testing.T) { - host, room := skipIfNoRealHost(t) - - const observe = 60 * time.Second - - ctx, cancel := context.WithTimeout(context.Background(), observe+30*time.Second) - defer cancel() - - sess, err := New(ctx, engine.Config{ - URL: host, - Extra: map[string]string{credentialKeyRoom: room}, - Name: "olcrtc-test", - OnData: func([]byte) {}, - }) - if err != nil { - t.Fatalf("New: %v", err) - } - defer func() { _ = sess.Close() }() - - if err := sess.Connect(ctx); err != nil { - t.Fatalf("Connect: %v", err) - } - - js, ok := sess.(*Session) - if !ok { - t.Fatalf("sess type = %T, want *Session", sess) - } - js.SetShouldReconnect(func() bool { return true }) - - deadline := time.Now().Add(observe) - for time.Now().Before(deadline) { - select { - case <-ctx.Done(): - t.Fatalf("test ctx died early: %v", ctx.Err()) - case <-time.After(5 * time.Second): - } - } - - js.reconnectMu.Lock() - count := js.reconnectCount - js.reconnectMu.Unlock() - - // We allow up to one reconnect during the observation window to - // cover legitimate transient hiccups; anything more indicates the - // keepalive lifetime regression. - if count > 1 { - t.Fatalf("observed %d reconnects in %s of idle — keepalive lifetime regression", - count, observe) - } -} - -// TestJitsiSelfReconnectIsClean simulates the failure mode the production -// log showed: a forced engine-side reconnect should not race with a stale -// rtcpKeepalive goroutine and produce duplicate "rtcp keepalive dead" -// reconnect requests. The test triggers the supervisor manually, lets -// the recovery complete, and then waits in idle for double the grace -// period to make sure no follow-up reconnect spuriously fires. -func TestJitsiSelfReconnectIsClean(t *testing.T) { - host, room := skipIfNoRealHost(t) - - settle := reconnectGrace + 5*time.Second - - ctx, cancel := context.WithTimeout(context.Background(), 4*time.Minute) - defer cancel() - - sess, err := New(ctx, engine.Config{ - URL: host, - Extra: map[string]string{credentialKeyRoom: room}, - Name: "olcrtc-test", - OnData: func([]byte) {}, - }) - if err != nil { - t.Fatalf("New: %v", err) - } - defer func() { _ = sess.Close() }() - - if err := sess.Connect(ctx); err != nil { - t.Fatalf("Connect: %v", err) - } - - js, ok := sess.(*Session) - if !ok { - t.Fatalf("sess type = %T, want *Session", sess) - } - js.SetShouldReconnect(func() bool { return true }) - - // Trip a single reconnect via the supervisor channel rather than - // killing the network: this isolates the keepalive regression from - // real-network flakiness. - js.requestReconnect("test-induced reconnect") - - // Wait for the supervisor goroutine (started by Connect via - // WatchConnection) to handle it. We check the counter, which is - // the canonical source of truth for "a reconnect attempt occurred". - deadline := time.Now().Add(2 * time.Minute) - for { - js.reconnectMu.Lock() - count := js.reconnectCount - js.reconnectMu.Unlock() - if count >= 1 { - break - } - if time.Now().After(deadline) { - t.Fatal("reconnect never registered in counter") - } - select { - case <-ctx.Done(): - t.Fatalf("test ctx died during reconnect wait: %v", ctx.Err()) - case <-time.After(time.Second): - } - } - - // The supervisor has started a reconnect; let it settle long enough - // to traverse the grace window. If a stale keepalive goroutine is - // alive, it would fire a second reconnect during this wait. - time.Sleep(settle) - - js.reconnectMu.Lock() - count := js.reconnectCount - js.reconnectMu.Unlock() - - // Allow up to 2 reconnects (the original + one allowed retry inside - // the same window), but anything ≥ 3 indicates the lifetime fix is - // not preventing duplicate firings. - if count >= 3 { - t.Fatalf("observed %d reconnects after a single trigger — duplicate firing regression", - count) - } - - if errors.Is(ctx.Err(), context.DeadlineExceeded) { - t.Fatal("test ctx expired before settle finished") - } -} diff --git a/internal/engine/jitsi/paired_chaos_test.go b/internal/engine/jitsi/paired_chaos_test.go new file mode 100644 index 0000000..6615c12 --- /dev/null +++ b/internal/engine/jitsi/paired_chaos_test.go @@ -0,0 +1,476 @@ +// Paired-instance chaos stress for the jitsi engine. +// +// Why paired: a single instance never receives session-initiate from +// Jicofo because of min-participants=2 (jicofo/.../reference.conf). +// Without a peer the bridge never opens and most of the engine's +// reconnect logic — peer-epoch latch, bridgeKeepalive, RTCP keepalive — +// is never exercised. The single-client tests proved that xmppKeepalive +// holds the BOSH session for a single endpoint, but the production +// failure mode the user actually observes (DTLS CloseNotify → cascading +// reconnects) is a property of the *paired* path. +// +// What this test does: +// +// 1. Spawn TWO Session instances against the same real Jitsi host and +// room, with shared bytes flowing between them. +// 2. Continuously pump small data through the bridge in both directions. +// 3. Periodically introduce chaos: +// - Force a teardownPC + requestReconnect on one side. +// - Long idle pauses (>60s) so both Prosody BOSH idle and JVB +// inactivityTimeout fire if any keepalive is broken. +// - Random side selection so both directions get exercised. +// 4. Track per-cycle outcomes and fail the test if either side +// permanently wedges (no Send for >2x the chaos cycle). +// +// Configuration via env (no flags so opt-in is one variable): +// +// OLCRTC_JITSI_PAIRED_HOST required, e.g. meet.handyweb.org +// OLCRTC_JITSI_PAIRED_ROOM optional, defaults to a unique name +// OLCRTC_JITSI_PAIRED_DURATION default 30m, "0"/"infinite" runs forever +// OLCRTC_JITSI_PAIRED_IDLE default 75s +// OLCRTC_JITSI_PAIRED_CHAOS_INTERVAL default 60s — how often to cause chaos +// OLCRTC_JITSI_PAIRED_VERBOSE default off +// +// Quick run: +// +// OLCRTC_JITSI_PAIRED_HOST=meet.handyweb.org \ +// go test -count=1 -v -timeout 35m \ +// -run '^TestJitsiPairedChaosStress$' ./internal/engine/jitsi/... +// +// Forever (Ctrl-C to stop, summary printed): +// +// OLCRTC_JITSI_PAIRED_HOST=meet.handyweb.org \ +// OLCRTC_JITSI_PAIRED_DURATION=0 \ +// go test -count=1 -v -timeout 0 \ +// -run '^TestJitsiPairedChaosStress$' ./internal/engine/jitsi/... + +package jitsi + +import ( + "context" + "fmt" + "math/rand" + "os" + "strings" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/openlibrecommunity/olcrtc/internal/engine" +) + +const ( + envPairedHost = "OLCRTC_JITSI_PAIRED_HOST" + envPairedRoom = "OLCRTC_JITSI_PAIRED_ROOM" + envPairedDuration = "OLCRTC_JITSI_PAIRED_DURATION" + envPairedIdle = "OLCRTC_JITSI_PAIRED_IDLE" + envPairedChaosInterval = "OLCRTC_JITSI_PAIRED_CHAOS_INTERVAL" + envPairedVerbose = "OLCRTC_JITSI_PAIRED_VERBOSE" +) + +type pairedConfig struct { + host, room string + duration time.Duration + idle time.Duration + chaosInterval time.Duration + verbose bool +} + +func (c *pairedConfig) durationLabel() string { + if c.duration == 0 { + return "infinite" + } + return c.duration.String() +} + +func readPairedConfig(t *testing.T) *pairedConfig { + t.Helper() + host := strings.TrimSpace(os.Getenv(envPairedHost)) + if host == "" { + t.Skipf("set %s to a real Jitsi host (e.g. meet.handyweb.org) to enable", envPairedHost) + } + cfg := &pairedConfig{ + host: host, + room: fmt.Sprintf("olcrtc-paired-%d", time.Now().UnixNano()), + duration: 30 * time.Minute, + idle: 75 * time.Second, + chaosInterval: 60 * time.Second, + } + if v := strings.TrimSpace(os.Getenv(envPairedRoom)); v != "" { + cfg.room = v + } + if v := strings.TrimSpace(os.Getenv(envPairedDuration)); v != "" { + switch strings.ToLower(v) { + case "0", "infinite", "forever": + cfg.duration = 0 + default: + d, err := time.ParseDuration(v) + if err != nil { + t.Fatalf("%s=%q: %v", envPairedDuration, v, err) + } + cfg.duration = d + } + } + if v := strings.TrimSpace(os.Getenv(envPairedIdle)); v != "" { + d, err := time.ParseDuration(v) + if err != nil { + t.Fatalf("%s=%q: %v", envPairedIdle, v, err) + } + cfg.idle = d + } + if v := strings.TrimSpace(os.Getenv(envPairedChaosInterval)); v != "" { + d, err := time.ParseDuration(v) + if err != nil { + t.Fatalf("%s=%q: %v", envPairedChaosInterval, v, err) + } + cfg.chaosInterval = d + } + if v := strings.TrimSpace(os.Getenv(envPairedVerbose)); v != "" { + cfg.verbose = v != "0" && strings.ToLower(v) != "false" + } + return cfg +} + +// pairedInstance wraps one half of the test pair and tracks rolling stats +// that the chaos loop uses to decide when to declare a wedge. +type pairedInstance struct { + name string + js *Session + + mu sync.Mutex + receivedFromOther int64 + lastReceiveAt time.Time +} + +func (p *pairedInstance) note(b []byte) { + if len(b) == 0 { + return + } + p.mu.Lock() + p.receivedFromOther++ + p.lastReceiveAt = time.Now() + p.mu.Unlock() +} + +func (p *pairedInstance) snapshot() (count int64, lastAt time.Time) { + p.mu.Lock() + defer p.mu.Unlock() + return p.receivedFromOther, p.lastReceiveAt +} + +// startInstance spins up one Session at a time so the second one is +// guaranteed to see the first as a peer (Jicofo session-initiate fires +// only when min-participants is reached). +func startInstance(ctx context.Context, t *testing.T, cfg *pairedConfig, name string) (*pairedInstance, error) { + t.Helper() + inst := &pairedInstance{name: name} + + sess, err := New(ctx, engine.Config{ + URL: cfg.host, + Extra: map[string]string{credentialKeyRoom: cfg.room}, + Name: name, + OnData: inst.note, + }) + if err != nil { + return nil, fmt.Errorf("new %s: %w", name, err) + } + js, ok := sess.(*Session) + if !ok { + _ = sess.Close() + return nil, fmt.Errorf("%s: cast to *Session failed", name) + } + js.SetShouldReconnect(func() bool { return ctx.Err() == nil }) + inst.js = js + + if err := sess.Connect(ctx); err != nil { + _ = sess.Close() + return nil, fmt.Errorf("connect %s: %w", name, err) + } + go js.WatchConnection(ctx) + return inst, nil +} + +// waitForBridge polls until the bridge is open on `inst` or the deadline +// passes. The bridge only opens after Jicofo issues session-initiate, +// which requires both participants to be in the room. +func waitForBridge(ctx context.Context, inst *pairedInstance, deadline time.Time) error { + for time.Now().Before(deadline) { + if inst.js.bridgeReady.Load() { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Second): + } + } + return fmt.Errorf("%s: bridge not ready before deadline", inst.name) +} + +// pumpLoop sends a small heartbeat payload from `from` to the other +// side every interval. The receive side uses inst.note to record arrival. +// We intentionally use the engine's Send (not SendTo) to exercise the +// peer-latch path. +func pumpLoop(ctx context.Context, t *testing.T, from *pairedInstance, interval time.Duration, payload []byte) { + t.Helper() + tick := time.NewTicker(interval) + defer tick.Stop() + seq := uint64(0) + for { + select { + case <-ctx.Done(): + return + case <-tick.C: + } + if !from.js.CanSend() { + continue + } + seq++ + buf := append([]byte(nil), payload...) + if len(buf) >= 8 { + for i := 0; i < 8; i++ { + buf[i] = byte(seq >> (8 * i)) + } + } + if err := from.js.Send(buf); err != nil { + // Send may legitimately fail mid-reconnect; the chaos + // supervisor will catch a permanent wedge. + continue + } + } +} + +type pairedStats struct { + cycles int64 + chaosKicks int64 + wedgesPair int64 // periods where neither side received any data + maxObservedRttMs atomic.Int64 + startedAt time.Time + lastChaosAt time.Time + bothSidesReceived bool +} + +// TestJitsiPairedChaosStress is the real chaos validator. It joins the +// same room with two engine instances, pumps data both ways, and then +// loops: +// +// - idle wait > Prosody BOSH timeout (60s) and JVB inactivityTimeout +// - forced teardownPC + requestReconnect on a randomly-chosen side +// - confirm both sides recover (CanSend == true and we see a fresh +// receive within a bounded window) +// +// Failure modes guarded: +// +// - One side wedges (CanSend stuck false) past idle + chaosInterval. +// - No application bytes flow across a chaos cycle (the recovery +// never re-establishes the bridge frame path). +// - Either side hits ErrSessionClosed at the engine level +// (the closed flag is the canonical "we gave up" signal). +// +//nolint:cyclop // chaos cycle structure naturally branches on phase + side +func TestJitsiPairedChaosStress(t *testing.T) { + cfg := readPairedConfig(t) + infinite := cfg.duration == 0 + + t.Logf("[paired] host=%s room=%s duration=%s idle=%s chaos-interval=%s verbose=%v", + cfg.host, cfg.room, cfg.durationLabel(), cfg.idle, cfg.chaosInterval, cfg.verbose) + + var ( + ctx context.Context + cancel context.CancelFunc + ) + if infinite { + ctx, cancel = context.WithCancel(context.Background()) + } else { + ctx, cancel = context.WithTimeout(context.Background(), cfg.duration+5*time.Minute) + } + defer cancel() + + // Spin up Alice first so she's already in the room when Bob arrives — + // this guarantees min-participants triggers session-initiate. + alice, err := startInstance(ctx, t, cfg, "alice") + if err != nil { + t.Fatalf("alice: %v", err) + } + defer func() { _ = alice.js.Close() }() + + // Brief settle so Alice is fully in the MUC before Bob joins. + time.Sleep(2 * time.Second) + + bob, err := startInstance(ctx, t, cfg, "bob") + if err != nil { + t.Fatalf("bob: %v", err) + } + defer func() { _ = bob.js.Close() }() + + // Now Jicofo should issue session-initiate to both. Give it some time + // to actually open the bridge on each side. + bridgeBudget := time.Now().Add(90 * time.Second) + if err := waitForBridge(ctx, alice, bridgeBudget); err != nil { + t.Fatalf("alice bridge: %v", err) + } + if err := waitForBridge(ctx, bob, bridgeBudget); err != nil { + t.Fatalf("bob bridge: %v", err) + } + t.Log("[paired] both bridges ready, starting pumps") + + // Background pumps: each side sends a heartbeat every 2s. The other + // side records arrivals via OnData. This is the actual end-to-end + // liveness signal — if it stops flowing, the bridge is dead. + pumpCtx, pumpCancel := context.WithCancel(ctx) + defer pumpCancel() + payload := []byte("0123456789abcdef-paired-keepalive-stress-payload") + go pumpLoop(pumpCtx, t, alice, 2*time.Second, payload) + go pumpLoop(pumpCtx, t, bob, 2*time.Second, payload) + + // Wait for the first roundtrip in each direction so we know the + // pumps are functional before chaos starts. + if err := waitFirstReceive(ctx, alice, bob, 60*time.Second); err != nil { + t.Fatalf("first roundtrip: %v", err) + } + t.Log("[paired] first bidirectional roundtrip OK, entering chaos loop") + + stats := pairedStats{startedAt: time.Now()} + defer func() { reportPairedStats(t, &stats, cfg) }() + stats.bothSidesReceived = true + + rng := rand.New(rand.NewSource(time.Now().UnixNano())) //nolint:gosec // test randomness only + + deadline := time.Time{} + if !infinite { + deadline = stats.startedAt.Add(cfg.duration) + } + chaosTick := time.NewTicker(cfg.chaosInterval) + defer chaosTick.Stop() + + for { + stats.cycles++ + + if !infinite && time.Now().After(deadline) { + t.Logf("[paired] cycle=%d budget exhausted, ending", stats.cycles) + return + } + if ctx.Err() != nil { + t.Logf("[paired] cycle=%d ctx ended (%v), ending", stats.cycles, ctx.Err()) + return + } + if alice.js.closed.Load() || bob.js.closed.Load() { + t.Fatalf("session closed mid-stress: alice.closed=%v bob.closed=%v", + alice.js.closed.Load(), bob.js.closed.Load()) + } + + // === Phase A: idle while pumps continue === + // Tests that neither BOSH nor JVB inactivityTimeout fires + // while we're pumping at 2s intervals. + if cfg.verbose { + t.Logf("[paired][%d] idle observation %s", stats.cycles, cfg.idle) + } + select { + case <-ctx.Done(): + return + case <-time.After(cfg.idle): + } + + // === Phase B: pick a victim and chaos === + victim := alice + victimName := "alice" + if rng.Intn(2) == 1 { + victim = bob + victimName = "bob" + } + if cfg.verbose { + t.Logf("[paired][%d] CHAOS victim=%s — teardownPC + requestReconnect", stats.cycles, victimName) + } + victim.js.teardownPC() + victim.js.requestReconnect(fmt.Sprintf("paired chaos cycle=%d victim=%s", stats.cycles, victimName)) + stats.chaosKicks++ + stats.lastChaosAt = time.Now() + + // === Phase C: recovery deadline === + // Bound the recovery window. If the victim does not produce + // a fresh receive on the survivor within recoveryBudget, the + // engine has wedged. + recoveryBudget := cfg.idle + 60*time.Second + survivor := alice + survivorName := "alice" + if victim == alice { + survivor = bob + survivorName = "bob" + } + if err := waitFreshReceive(ctx, survivor, recoveryBudget); err != nil { + stats.wedgesPair++ + t.Errorf("[paired][%d] WEDGE survivor=%s did not receive after %s chaos on %s: %v", + stats.cycles, survivorName, recoveryBudget, victimName, err) + // Try to keep going; production behaviour is what we're + // trying to capture, not a single hard failure. + continue + } + if cfg.verbose { + t.Logf("[paired][%d] recovered, survivor=%s saw fresh receive", stats.cycles, survivorName) + } + + // Honour the chaos tick so we don't burn through cycles + // faster than the configured cadence. + select { + case <-ctx.Done(): + return + case <-chaosTick.C: + } + } +} + +// waitFirstReceive blocks until each side has seen at least one OnData +// invocation. Returns ctx.Err() or a deadline error. +func waitFirstReceive(ctx context.Context, a, b *pairedInstance, budget time.Duration) error { + deadline := time.Now().Add(budget) + for time.Now().Before(deadline) { + ac, _ := a.snapshot() + bc, _ := b.snapshot() + if ac > 0 && bc > 0 { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(500 * time.Millisecond): + } + } + ac, _ := a.snapshot() + bc, _ := b.snapshot() + return fmt.Errorf("did not see bidirectional roundtrip in %s (alice=%d bob=%d)", + budget, ac, bc) +} + +// waitFreshReceive blocks until target sees a NEW receive after this call, +// i.e. the count strictly increases. This is how we observe that the +// bridge fully recovered: bytes are arriving from the (forced-to-reconnect) +// peer. +func waitFreshReceive(ctx context.Context, target *pairedInstance, budget time.Duration) error { + startCount, _ := target.snapshot() + deadline := time.Now().Add(budget) + for time.Now().Before(deadline) { + c, _ := target.snapshot() + if c > startCount { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(500 * time.Millisecond): + } + } + return fmt.Errorf("no new receive in %s (count stuck at %d)", budget, startCount) +} + +func reportPairedStats(t *testing.T, s *pairedStats, cfg *pairedConfig) { + t.Helper() + elapsed := time.Since(s.startedAt).Round(time.Second) + t.Logf( + "[paired] DONE elapsed=%s cycles=%d chaosKicks=%d wedges=%d duration=%s idle=%s", + elapsed, s.cycles, s.chaosKicks, s.wedgesPair, cfg.durationLabel(), cfg.idle, + ) + if s.wedgesPair > 0 { + t.Errorf("observed %d pair wedges (recovery never produced new bytes)", s.wedgesPair) + } +}