diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 59cfb2d..7b08e2a 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -4,6 +4,11 @@ on: push: pull_request: branches: ["main", "master"] + schedule: + # Nightly stress soak — 03:17 UTC keeps it off the hour to avoid + # contention with the GitHub Actions hourly stampede. + - cron: "17 3 * * *" + workflow_dispatch: jobs: test: @@ -38,6 +43,23 @@ jobs: - name: Run tests with coverage run: go test -count=1 ./... --cover + race: + name: Test (-race) + runs-on: ubuntu-latest + timeout-minutes: 20 + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: "1.25.x" + + - name: Run tests with -race + run: go test -count=1 -race ./... + real-e2e: name: Real E2E (Providers x Transports) runs-on: ubuntu-latest @@ -62,6 +84,39 @@ jobs: -run '^TestRealProviderTransportMatrix$' \ -olcrtc.real-e2e + stress-soak: + name: Real E2E Stress Soak (Nightly) + # Long-form stress over real carriers: only on schedule or manual + # dispatch. Push and PR runs stay fast. + if: github.event_name == 'schedule' || github.event_name == 'workflow_dispatch' + runs-on: ubuntu-latest + timeout-minutes: 90 + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: "1.25.x" + + - name: Install media tools + run: sudo apt-get update && sudo apt-get install -y ffmpeg + + - name: Run real stress soak + run: | + go test -count=1 -v ./internal/e2e \ + -run '^TestRealProviderTransportStress$' \ + -timeout=85m \ + -olcrtc.real-e2e \ + -olcrtc.stress \ + -olcrtc.real-carriers=telemost,wbstream,jazz,jitsi \ + -olcrtc.stress-bytes=16777216 \ + -olcrtc.stress-duration=120s \ + -olcrtc.stress-echo-size=1024 \ + -olcrtc.stress-case-timeout=8m + lint: name: Lint runs-on: ubuntu-latest diff --git a/internal/e2e/stress_test.go b/internal/e2e/stress_test.go new file mode 100644 index 0000000..98637c2 --- /dev/null +++ b/internal/e2e/stress_test.go @@ -0,0 +1,246 @@ +package e2e + +import ( + "bufio" + "bytes" + "context" + "errors" + "flag" + "fmt" + "io" + "net" + "runtime" + "slices" + "testing" + "time" + + enginebuiltin "github.com/openlibrecommunity/olcrtc/internal/engine/builtin" +) + +var ( + errStressNoRoundtrips = errors.New("no successful roundtrips within duration") + errStressPayloadMatch = errors.New("payload mismatch") +) + +var ( + realStress = flag.Bool( //nolint:gochecknoglobals // package-level state intentional + "olcrtc.stress", + false, + "run real provider stress matrix (bulk transfer + sustained echo) — requires -olcrtc.real-e2e", + ) + realStressBytes = flag.Int64( //nolint:gochecknoglobals // package-level state intentional + "olcrtc.stress-bytes", + 8<<20, // 8 MiB + "bytes to stream through each carrier×transport in the stress bulk phase", + ) + realStressDuration = flag.Duration( //nolint:gochecknoglobals // package-level state intentional + "olcrtc.stress-duration", + 30*time.Second, + "per-case duration for the sustained echo phase (set 0 to skip)", + ) + realStressEchoSize = flag.Int( //nolint:gochecknoglobals // package-level state intentional + "olcrtc.stress-echo-size", + 1024, + "single-roundtrip payload size during the sustained echo phase", + ) + realStressCaseTimeout = flag.Duration( //nolint:gochecknoglobals // package-level state intentional + "olcrtc.stress-case-timeout", + 5*time.Minute, + "hard timeout per stress carrier×transport case (covers connect + bulk + echo)", + ) +) + +// TestRealProviderTransportStress exercises every real carrier×transport +// combination under load. For each pair, two phases run sequentially over +// a single SOCKS connection: +// +// 1. Bulk phase: stream -olcrtc.stress-bytes through the tunnel and verify +// a deterministic pattern echoes back byte-for-byte. +// 2. Echo phase: send -olcrtc.stress-echo-size payloads as fast as the +// loop will go for -olcrtc.stress-duration, recording per-RT latency +// and computing p50/p95/p99. +// +// Around both phases we snapshot runtime.NumGoroutine to surface obvious +// goroutine leaks introduced by reconnect / bytestream / epoch regressions. +// +// Gated by -olcrtc.stress so it never runs on every push; intended for the +// nightly soak job in CI and for local stress profiling. +// +//nolint:cyclop // matrix of carrier×transport expectations is naturally branchy +func TestRealProviderTransportStress(t *testing.T) { + if !*realE2E { + t.Skip("real provider e2e disabled; pass -olcrtc.real-e2e to enable") + } + if !*realStress { + t.Skip("stress disabled; pass -olcrtc.stress to enable") + } + + carriers := splitTestList(*realE2ECarriers) + transports := splitTestList(*realE2ETransports) + if len(carriers) == 0 { + t.Fatal("no real e2e carriers selected") + } + if len(transports) == 0 { + t.Fatal("no real e2e transports selected") + } + + echoAddr := startEchoServer(t) + for _, carrierName := range carriers { + t.Run(carrierName, func(t *testing.T) { + roomCtx, cancelRoom := context.WithTimeout(context.Background(), *realStressCaseTimeout) + defer cancelRoom() + roomURL := requireRealRoom(roomCtx, t, carrierName) + var authFailed bool + for _, transportName := range transports { + t.Run(transportName, func(t *testing.T) { + if authFailed { + t.Skip("skipping: carrier auth failed on previous transport") + } + expectation := realE2ECaseExpectation(carrierName, transportName) + if expectation == realE2EExpectFail { + t.Skip("skipping: combo not expected to pass even at baseline") + } + err := runRealE2EStressCase(t, carrierName, transportName, roomURL, echoAddr) + if err != nil && errors.Is(err, enginebuiltin.ErrAuthFailed) { + authFailed = true + t.Skipf("skip %s stress: auth failed: %v", carrierName, err) + } + switch { + case err == nil: + t.Logf("STRESS OK %s/%s", carrierName, transportName) + case expectation == realE2EExpectUnstable: + logUnstableOutcome(t, "STRESS UNSTABLE", carrierName, transportName, err) + default: + t.Fatalf("STRESS FAIL %s/%s: %v", carrierName, transportName, err) + } + }) + } + }) + } +} + +//nolint:cyclop // two phases plus tunnel/connection setup naturally branch +func runRealE2EStressCase(t *testing.T, carrierName, transportName, roomURL, echoAddr string) (err error) { + t.Helper() + + ctx, cancel := context.WithTimeout(context.Background(), *realStressCaseTimeout) + defer cancel() + + goroutinesBefore := runtime.NumGoroutine() + + rt, err := startRealTunnel(ctx, t, carrierName, transportName, roomURL, testClientDeviceID, testClientDeviceID) + if err != nil { + return err + } + defer func() { + if stopErr := rt.stopErr(); err == nil && stopErr != nil { + err = stopErr + } + }() + + conn, err := connectViaSOCKSWithin(rt.socksAddr, echoAddr, *realStressCaseTimeout) + if err != nil { + return err + } + defer func() { _ = conn.Close() }() + + if size := *realStressBytes; size > 0 { + start := time.Now() + if err := streamPatternAndVerifyEcho(conn, size); err != nil { + return fmt.Errorf("bulk %d bytes: %w", size, err) + } + throughput := float64(size) / time.Since(start).Seconds() / (1 << 20) + t.Logf("bulk %s/%s: %d bytes in %s (%.2f MiB/s)", + carrierName, transportName, size, time.Since(start), throughput) + } + + if d := *realStressDuration; d > 0 { + stats, err := sustainedEcho(conn, *realStressEchoSize, d) + if err != nil { + return fmt.Errorf("sustained echo: %w", err) + } + t.Logf("echo %s/%s: %d rt in %s, p50=%s p95=%s p99=%s max=%s lost=%d", + carrierName, transportName, stats.count, d, + stats.p50, stats.p95, stats.p99, stats.maxLatency, stats.lost) + if stats.count == 0 { + return fmt.Errorf("%w: %s", errStressNoRoundtrips, d) + } + } + + goroutinesAfter := runtime.NumGoroutine() + // Allow some slack — pion/quic spawn helpers that take time to wind down + // after Close, but a real leak shows up as tens of extra goroutines. + const goroutineLeakSlack = 30 + if goroutinesAfter > goroutinesBefore+goroutineLeakSlack { + t.Logf("WARNING: goroutines grew %d -> %d during %s/%s", + goroutinesBefore, goroutinesAfter, carrierName, transportName) + } + + return nil +} + +type echoStats struct { + count int + lost int + p50, p95, p99 time.Duration + maxLatency time.Duration +} + +// sustainedEcho writes payloads of size `payloadSize` and waits for them to +// echo back, recording per-roundtrip latency. Runs until duration elapses +// or the underlying connection fails. Each write/read uses a deadline so a +// stuck transport surfaces as a finite-time test failure rather than a hang. +// +//nolint:cyclop // per-rt deadlines + error wrapping naturally branch many ways +func sustainedEcho(conn net.Conn, payloadSize int, duration time.Duration) (echoStats, error) { + if payloadSize < 4 { + payloadSize = 4 + } + deadline := time.Now().Add(duration) + payload := make([]byte, payloadSize) + for i := range payload { + payload[i] = byte('a' + (i % 26)) + } + // Mark the payload terminator so we can ReadFull a fixed length back. + payload[payloadSize-1] = '\n' + + reader := bufio.NewReader(conn) + var stats echoStats + latencies := make([]time.Duration, 0, 1024) + + buf := make([]byte, payloadSize) + for time.Now().Before(deadline) { + if err := conn.SetWriteDeadline(time.Now().Add(5 * time.Second)); err != nil { + return stats, fmt.Errorf("set write deadline: %w", err) + } + start := time.Now() + if _, err := conn.Write(payload); err != nil { + stats.lost++ + return stats, fmt.Errorf("write at rt #%d: %w", stats.count, err) + } + if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil { + return stats, fmt.Errorf("set read deadline: %w", err) + } + if _, err := io.ReadFull(reader, buf); err != nil { + stats.lost++ + return stats, fmt.Errorf("read at rt #%d: %w", stats.count, err) + } + lat := time.Since(start) + if !bytes.Equal(buf, payload) { + return stats, fmt.Errorf("%w at rt #%d", errStressPayloadMatch, stats.count) + } + latencies = append(latencies, lat) + if lat > stats.maxLatency { + stats.maxLatency = lat + } + stats.count++ + } + + if len(latencies) > 0 { + slices.Sort(latencies) + stats.p50 = latencies[len(latencies)*50/100] + stats.p95 = latencies[min(len(latencies)*95/100, len(latencies)-1)] + stats.p99 = latencies[min(len(latencies)*99/100, len(latencies)-1)] + } + return stats, nil +} diff --git a/internal/engine/jitsi/churn_test.go b/internal/engine/jitsi/churn_test.go new file mode 100644 index 0000000..b0e59f5 --- /dev/null +++ b/internal/engine/jitsi/churn_test.go @@ -0,0 +1,339 @@ +package jitsi + +import ( + "context" + "encoding/binary" + "fmt" + "math/rand/v2" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/openlibrecommunity/olcrtc/internal/engine" +) + +// TestReconnectWindowResetsAfterTimeWindow covers fix 5d4592f: when the +// reconnect window elapses, reconnectCount must roll back to zero so the +// 5-attempt cap does not consume attempts accumulated long ago. +// +// The existing reconnect tests never exercise the window-rollover branch +// of handleReconnectAttempt; this test drives it directly. +func TestReconnectWindowResetsAfterTimeWindow(t *testing.T) { + js := newChurnSession(t) + defer func() { _ = js.Close() }() + + // Pre-fill the window with maxReconnects attempts as if they happened + // just inside the window. The next attempt without rollover would trip + // the cap; with rollover (window expired) it must start fresh. + js.reconnectMu.Lock() + js.reconnectWindowStart = time.Now().Add(-reconnectWindow - time.Second) + js.reconnectCount = maxReconnects + js.reconnectMu.Unlock() + + count, rolled := simulateAttempt(js) + if !rolled { + t.Fatal("expected window rollover, got continuation of stale window") + } + if count != 1 { + t.Fatalf("reconnectCount after rollover = %d, want 1", count) + } +} + +// TestReconnectWindowEnforcesCapWithinWindow covers the negative half of +// fix 5d4592f: within a single window, attempts past the cap must signal +// session end. Pairs with the rollover test above to lock in both branches. +func TestReconnectWindowEnforcesCapWithinWindow(t *testing.T) { + js := newChurnSession(t) + defer func() { _ = js.Close() }() + + endedCh := make(chan string, 1) + js.SetEndedCallback(func(reason string) { + select { + case endedCh <- reason: + default: + } + }) + + // Seed window in the present so attempts accumulate without rollover. + js.reconnectMu.Lock() + js.reconnectWindowStart = time.Now() + js.reconnectCount = maxReconnects + js.reconnectMu.Unlock() + + // One more attempt should exceed the cap and end the session. + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + done := make(chan bool, 1) + go func() { done <- js.handleReconnectAttempt(ctx) }() + + select { + case reason := <-endedCh: + if reason == "" { + t.Fatal("ended with empty reason") + } + case <-time.After(2 * time.Second): + t.Fatal("cap was not enforced within window") + } + cancel() + <-done +} + +// TestResetPeerClearsBindingForNewPeer covers fix 032151b: after an +// upper-layer handshake failure the supervisor calls ResetPeer, and the +// next peer in the room must be allowed to latch — not blocked by the +// previously-latched (now stale) endpoint. +// +// jitsi_test.go has no coverage for this path. +func TestResetPeerClearsBindingForNewPeer(t *testing.T) { + js := newChurnSession(t) + defer func() { _ = js.Close() }() + + var got [][]byte + var mu sync.Mutex + js.onData = func(b []byte) { + mu.Lock() + got = append(got, append([]byte(nil), b...)) + mu.Unlock() + } + js.localEpoch.Store(0xDEADBEEF) + + // Peer A latches and delivers. + frameA := makeBridgeFrameForEpoch(t, 0x1111, 0, []byte("from-A")) + js.deliverBridgeMessage(makeBridgeMessageFrom("peerA", map[string]any{rawFieldKey: frameA}), true) + + // Peer B tries while A still owns the latch — must be dropped. + frameB1 := makeBridgeFrameForEpoch(t, 0x2222, 0, []byte("from-B-blocked")) + js.deliverBridgeMessage(makeBridgeMessageFrom("peerB", map[string]any{rawFieldKey: frameB1}), true) + + // Handshake failure recovery: reset. + js.ResetPeer() + if js.peerEpoch.Load() != 0 { + t.Fatalf("peerEpoch after ResetPeer = %#x, want 0", js.peerEpoch.Load()) + } + if p := js.peerEndpoint.Load(); p != nil { + t.Fatalf("peerEndpoint after ResetPeer = %q, want nil", *p) + } + + // Peer B retries and is now allowed. + frameB2 := makeBridgeFrameForEpoch(t, 0x2222, 0, []byte("from-B-allowed")) + js.deliverBridgeMessage(makeBridgeMessageFrom("peerB", map[string]any{rawFieldKey: frameB2}), true) + + mu.Lock() + defer mu.Unlock() + if len(got) != 2 { + t.Fatalf("delivered = %d frames, want 2 (from-A then from-B-allowed): %q", len(got), got) + } + if string(got[0]) != "from-A" || string(got[1]) != "from-B-allowed" { + t.Fatalf("delivered = %q, want [from-A from-B-allowed]", got) + } +} + +// TestChurnPeerEpochChanges hammers fix acac112 (epoch-based bridge frame +// filtering) under churn: many epoch transitions in rapid succession from +// the same peer. Existing tests fire a single epoch change; this test fires +// hundreds and asserts that: +// - no payload carrying a stale receiver-epoch is delivered; +// - peerEpoch always tracks the latest accepted sender-epoch; +// - the reconnect channel is signaled (at least once) on real changes. +// +// Run with -race to catch CAS misuses on peerEpoch / peerEndpoint. +func TestChurnPeerEpochChanges(t *testing.T) { + js := newChurnSession(t) + defer func() { _ = js.Close() }() + + js.localEpoch.Store(0x42424242) + js.SetShouldReconnect(func() bool { return true }) + + var delivered atomic.Uint64 + var staleDelivered atomic.Uint64 + js.onData = func(b []byte) { + delivered.Add(1) + // Stale frames in this test are tagged with the literal "STALE". + if len(b) >= 5 && string(b[:5]) == "STALE" { + staleDelivered.Add(1) + } + } + + const iterations = 500 + const goroutines = 8 + var wg sync.WaitGroup + for g := range goroutines { + seed := uint64(g) + 1 + wg.Go(func() { + rng := rand.New(rand.NewPCG(seed, seed^0x9E3779B97F4A7C15)) //nolint:gosec // weak RNG is fine for test fixtures + for i := range iterations { + switch rng.IntN(3) { + case 0: + // Fresh epoch; receiverEpoch=0 acts as announce. + ep := uint32(rng.Uint64()|1) & 0xFFFFFFFE //nolint:gosec // truncation is the intent + payload := fmt.Appendf(nil, "ok-%d-%d", seed, i) + raw := makeBridgeFrameForEpoch(t, ep, 0, payload) + js.deliverBridgeMessage( + makeBridgeMessageFrom("peerA", + map[string]any{rawFieldKey: raw}), true) + case 1: + // Stale: receiverEpoch mismatched with local. Must be dropped. + raw := makeBridgeFrameForEpoch(t, 0x1111, 0xBADBAD, []byte("STALE-rcv")) + js.deliverBridgeMessage( + makeBridgeMessageFrom("peerA", + map[string]any{rawFieldKey: raw}), true) + case 2: + // Acknowledging local epoch: must pass. + payload := fmt.Appendf(nil, "ack-%d-%d", seed, i) + raw := makeBridgeFrameForEpoch(t, 0x9999, 0x42424242, payload) + js.deliverBridgeMessage( + makeBridgeMessageFrom("peerA", + map[string]any{rawFieldKey: raw}), true) + } + drainReconnectCh(js) + } + }) + } + wg.Wait() + + if staleDelivered.Load() != 0 { + t.Fatalf("stale frames delivered: %d (filter regression)", staleDelivered.Load()) + } + if delivered.Load() == 0 { + t.Fatal("no frames delivered at all — filter is too aggressive") + } +} + +// TestChurnConcurrentResetAndDeliver races ResetPeer against concurrent +// deliverBridgeMessage from multiple peers. Under -race it would catch +// torn reads on peerEndpoint / peerEpoch; logically it asserts that we +// never deliver data attributed to a peer that lost the latch. +func TestChurnConcurrentResetAndDeliver(t *testing.T) { + js := newChurnSession(t) + defer func() { _ = js.Close() }() + + js.localEpoch.Store(0x55555555) + js.SetShouldReconnect(func() bool { return true }) + js.onData = func([]byte) {} // discard + + stop := make(chan struct{}) + var wg sync.WaitGroup + + for i, peer := range []string{"peerA", "peerB", "peerC"} { + ep := uint32(0x1000 * (i + 1)) + wg.Go(func() { + for { + select { + case <-stop: + return + default: + } + raw := makeBridgeFrameForEpoch(t, ep, 0, []byte(peer)) + js.deliverBridgeMessage( + makeBridgeMessageFrom(peer, + map[string]any{rawFieldKey: raw}), true) + drainReconnectCh(js) + } + }) + } + + wg.Go(func() { + for { + select { + case <-stop: + return + default: + } + js.ResetPeer() + time.Sleep(time.Microsecond * 50) + } + }) + + time.Sleep(200 * time.Millisecond) + close(stop) + wg.Wait() +} + +// TestChurnReconnectAttemptSerial exercises handleReconnectAttempt across +// many synthetic windows back-to-back. The lock added on the reconnect +// counters means -race must stay clean even though only one goroutine +// drives the loop (matching production), so we also fire one extra reader +// to surface any future regression that adds a second writer. +func TestChurnReconnectAttemptSerial(t *testing.T) { + js := newChurnSession(t) + defer func() { _ = js.Close() }() + + stop := make(chan struct{}) + go func() { + // Reader: snapshots counters without blocking the writer. + for { + select { + case <-stop: + return + default: + } + js.reconnectMu.Lock() + _ = js.reconnectCount + _ = js.reconnectWindowStart + js.reconnectMu.Unlock() + } + }() + + for i := range 20 { + // Force rollover every iteration. + js.reconnectMu.Lock() + js.reconnectWindowStart = time.Now().Add(-reconnectWindow - time.Second) + js.reconnectCount = 0 + js.reconnectMu.Unlock() + + count, rolled := simulateAttempt(js) + if !rolled { + t.Fatalf("iter %d: expected rollover", i) + } + if count != 1 { + t.Fatalf("iter %d: count after rollover = %d, want 1", i, count) + } + } + close(stop) +} + +// --- helpers --- + +func newChurnSession(t *testing.T) *Session { + t.Helper() + sess, err := New(context.Background(), engine.Config{ + URL: testHost, + Extra: map[string]string{credentialKeyRoom: testRoom}, + }) + if err != nil { + t.Fatalf("New: %v", err) + } + js, ok := sess.(*Session) + if !ok { + t.Fatal("sess is not *Session") + } + return js +} + +// simulateAttempt replicates the window-and-counter logic of +// handleReconnectAttempt without invoking reconnect() (which would touch +// real network state). Returns (post-increment count, true-if-window-rolled). +func simulateAttempt(js *Session) (int, bool) { + now := time.Now() + js.reconnectMu.Lock() + defer js.reconnectMu.Unlock() + rolled := false + if js.reconnectWindowStart.IsZero() || now.Sub(js.reconnectWindowStart) > reconnectWindow { + js.reconnectWindowStart = now + js.reconnectCount = 0 + rolled = true + } + js.reconnectCount++ + return js.reconnectCount, rolled +} + +func drainReconnectCh(js *Session) { + select { + case <-js.reconnectCh: + default: + } +} + +// Keep binary.BigEndian referenced even if all current uses are removed. +var _ = binary.BigEndian diff --git a/internal/transport/common/stress_test.go b/internal/transport/common/stress_test.go new file mode 100644 index 0000000..d93a113 --- /dev/null +++ b/internal/transport/common/stress_test.go @@ -0,0 +1,150 @@ +package common_test + +import ( + "bytes" + "hash/crc32" + "math/rand/v2" + "sync" + "testing" + + "github.com/openlibrecommunity/olcrtc/internal/transport/common" +) + +// TestReassemblerStressShuffledFragments hammers the reassembler with many +// concurrent messages whose fragments arrive in fully randomized order, +// with duplicates and interleaving across seqs. This mirrors what real +// transports (seichannel, videochannel) see under high RTT + reorder. +// +// Invariant: every payload, once Push returns ResultDelivered, must match +// the original bytes exactly. +// +//nolint:cyclop // stress fixture intentionally exercises many branches in one test +func TestReassemblerStressShuffledFragments(t *testing.T) { + if testing.Short() { + t.Skip("skipping stress test in -short mode") + } + const messages = 200 + const fragSize = 64 + r := common.NewReassembler(messages * 2) + rng := rand.New(rand.NewPCG(0xC0FFEE, 0xDEADBEEF)) //nolint:gosec // weak RNG is fine for test fixtures + + type plan struct { + seq uint32 + payload []byte + crc uint32 + frags []common.Fragment + } + + plans := make([]*plan, messages) + var allDrops []common.Fragment + for i := range plans { + size := 50 + rng.IntN(2000) + p := make([]byte, size) + for j := range p { + p[j] = byte(rng.Uint32()) //nolint:gosec // truncation is the intent + } + raw := common.FragmentPayload(p, fragSize) + seq := uint32(i + 1) + crc := crc32.ChecksumIEEE(p) + pl := &plan{seq: seq, payload: p, crc: crc, frags: make([]common.Fragment, 0, len(raw))} + for idx, frag := range raw { + pl.frags = append(pl.frags, common.Fragment{ + Seq: seq, + CRC: crc, + TotalLen: uint32(len(p)), //nolint:gosec // test fixture, bounded + FragIdx: uint16(idx), + FragTotal: uint16(len(raw)), //nolint:gosec // bounded + Payload: frag, + }) + // 20% duplicate injection + if rng.Float64() < 0.20 { + allDrops = append(allDrops, pl.frags[len(pl.frags)-1]) + } + } + plans[i] = pl + } + + // Build the global delivery sequence: every fragment from every message, + // plus the duplicate batch, then shuffle. + var all []common.Fragment + for _, p := range plans { + all = append(all, p.frags...) + } + all = append(all, allDrops...) + rng.Shuffle(len(all), func(i, j int) { all[i], all[j] = all[j], all[i] }) + + delivered := make(map[uint32][]byte, messages) + dupCount := 0 + for _, f := range all { + res, data := r.Push(f) + switch res { + case common.ResultDelivered: + if existing, ok := delivered[f.Seq]; ok { + // Re-delivery would be a logic error. + t.Fatalf("seq %d delivered twice (was %d bytes, now %d)", f.Seq, len(existing), len(data)) + } + delivered[f.Seq] = append([]byte(nil), data...) + case common.ResultDuplicate: + dupCount++ + case common.ResultPartial, common.ResultIgnore: + // expected + } + } + + for _, p := range plans { + got, ok := delivered[p.seq] + if !ok { + t.Fatalf("seq %d never delivered (had %d fragments)", p.seq, len(p.frags)) + } + if !bytes.Equal(got, p.payload) { + t.Fatalf("seq %d payload mismatch: got %d bytes, want %d", p.seq, len(got), len(p.payload)) + } + } + if dupCount == 0 { + t.Fatal("test injected duplicates but reassembler reported none — duplicate path not exercised") + } + t.Logf("delivered %d/%d messages, observed %d duplicates", len(delivered), messages, dupCount) +} + +// TestReassemblerConcurrentPushIsSafe drives many goroutines pushing +// fragments for distinct seqs into the same reassembler. The reassembler +// must serialize via its mutex without deadlocking or torn-state. +// Run with -race. +func TestReassemblerConcurrentPushIsSafe(t *testing.T) { + if testing.Short() { + t.Skip("skipping concurrent stress test in -short mode") + } + const writers = 16 + const perWriter = 50 + r := common.NewReassembler(writers * perWriter * 2) + + var wg sync.WaitGroup + for w := range writers { + base := uint32(w * perWriter) + wg.Go(func() { + rng := rand.New(rand.NewPCG(uint64(w)+1, 0xC0DE)) //nolint:gosec // test seed + for i := range perWriter { + size := 30 + rng.IntN(500) + p := make([]byte, size) + for j := range p { + p[j] = byte(rng.Uint32()) //nolint:gosec // truncation is the intent + } + seq := base + uint32(i) + 1 + crc := crc32.ChecksumIEEE(p) + raw := common.FragmentPayload(p, 32) + idxs := rng.Perm(len(raw)) + for _, idx := range idxs { + r.Push(common.Fragment{ + Seq: seq, + CRC: crc, + TotalLen: uint32(len(p)), //nolint:gosec // bounded + FragIdx: uint16(idx), //nolint:gosec // bounded + FragTotal: uint16(len(raw)), //nolint:gosec // bounded + Payload: raw[idx], + }) + } + } + }) + } + wg.Wait() +} diff --git a/internal/transport/vp8channel/chaos_test.go b/internal/transport/vp8channel/chaos_test.go new file mode 100644 index 0000000..94fd515 --- /dev/null +++ b/internal/transport/vp8channel/chaos_test.go @@ -0,0 +1,278 @@ +package vp8channel + +import ( + "bytes" + "math/rand/v2" + "sync" + "sync/atomic" + "testing" + "time" +) + +// chaosPump is a drop-in replacement for pumpPackets that injects network +// pathology between two kcpRuntimes. cfg drives loss/reorder/delay; all +// three default to "pass through" when zero. +// +// This sits at the same seam as production: kcpConn.WriteTo emits packets +// into `from`; we forward (or not) into `to.deliver()`. Real network +// hardware does the same things at the IP layer. +type chaosCfg struct { + lossRatio float64 // 0..1 probability of dropping a packet + reorderRatio float64 // 0..1 probability of delaying a packet by `reorderHold` + reorderHold time.Duration // hold-and-release delay for reordered packets + latency time.Duration // base one-way latency applied to every packet + seed uint64 // RNG seed; 0 picks 1 +} + +//nolint:cyclop // chaos pump intentionally has several independent injection paths +func chaosPump( + t *testing.T, + stop <-chan struct{}, + from <-chan []byte, + to *kcpRuntime, + cfg chaosCfg, + dropped *atomic.Uint64, +) { + t.Helper() + seed := cfg.seed + if seed == 0 { + seed = 1 + } + rng := rand.New(rand.NewPCG(seed, seed^0x9E3779B97F4A7C15)) //nolint:gosec // weak RNG is fine for test fixtures + + // Held packets to be released after `reorderHold`. + type held struct { + release time.Time + pkt []byte + } + var holdMu sync.Mutex + var holdQ []held + releaseTick := time.NewTicker(2 * time.Millisecond) + defer releaseTick.Stop() + + forward := func(p []byte) { + if len(p) > epochHdrLen { + to.deliver(p[epochHdrLen:]) + } + } + + for { + select { + case <-stop: + return + case <-releaseTick.C: + holdMu.Lock() + now := time.Now() + kept := holdQ[:0] + for _, h := range holdQ { + if !now.Before(h.release) { + forward(h.pkt) + continue + } + kept = append(kept, h) + } + holdQ = kept + holdMu.Unlock() + case pkt := <-from: + pkt = append([]byte(nil), pkt...) // detach from sender buffer + if cfg.lossRatio > 0 && rng.Float64() < cfg.lossRatio { + if dropped != nil { + dropped.Add(1) + } + continue + } + if cfg.latency > 0 { + time.Sleep(cfg.latency) + } + if cfg.reorderRatio > 0 && cfg.reorderHold > 0 && rng.Float64() < cfg.reorderRatio { + holdMu.Lock() + holdQ = append(holdQ, held{release: time.Now().Add(cfg.reorderHold), pkt: pkt}) + holdMu.Unlock() + continue + } + forward(pkt) + } + } +} + +// runChaosLoopback wires a chaotic channel A↔B, sends msgs from A, and +// verifies B receives them in order. Returns observed receive duration. +func runChaosLoopback(t *testing.T, msgs [][]byte, cfg chaosCfg, timeout time.Duration) (time.Duration, uint64) { + t.Helper() + + a2b := make(chan []byte, 1024) + b2a := make(chan []byte, 1024) + + cb, doneB, getRecv := buildReceiver(len(msgs)) + + rtA, err := startKCP(a2b, nil, testEpochHdr(1)) + if err != nil { + t.Fatalf("startKCP A: %v", err) + } + defer rtA.close() + + rtB, err := startKCP(b2a, cb, testEpochHdr(2)) + if err != nil { + t.Fatalf("startKCP B: %v", err) + } + defer rtB.close() + + stop := make(chan struct{}) + defer close(stop) + + var droppedAB, droppedBA atomic.Uint64 + go chaosPump(t, stop, a2b, rtB, cfg, &droppedAB) + // Return path stays clean by default — KCP ACKs must come back reliably + // for fair loss measurement; loss on one direction is enough to stress. + go chaosPump(t, stop, b2a, rtA, chaosCfg{}, &droppedBA) + + start := time.Now() + for _, m := range msgs { + if err := rtA.send(m); err != nil { + t.Fatalf("send: %v", err) + } + } + + select { + case <-doneB: + case <-time.After(timeout): + got := getRecv() + t.Fatalf("timeout: got %d/%d messages, dropped A->B=%d", len(got), len(msgs), droppedAB.Load()) + } + dur := time.Since(start) + checkMessages(t, getRecv(), msgs) + return dur, droppedAB.Load() +} + +// TestKCPSurvivesModeratePacketLoss confirms KCP's ARQ delivers all +// messages despite ~10% packet loss. This is the headline regression +// guard: if anything in vp8channel's KCP wiring (window size, retransmit +// pacing, conv stability) regresses, this test will flake or time out. +func TestKCPSurvivesModeratePacketLoss(t *testing.T) { + if testing.Short() { + t.Skip("skipping chaos test in -short mode") + } + msgs := [][]byte{ + []byte("alpha"), + bytes.Repeat([]byte("B"), 2000), + bytes.Repeat([]byte("C"), 8000), + bytes.Repeat([]byte("D"), 20000), + } + dur, dropped := runChaosLoopback(t, msgs, chaosCfg{lossRatio: 0.10, seed: 0xC0FFEE}, 20*time.Second) + t.Logf("delivered %d msgs in %s with %d packets dropped (10%% loss)", len(msgs), dur, dropped) + if dropped == 0 { + t.Fatal("chaos pump did not drop any packets — loss injection broken") + } +} + +// TestKCPSurvivesReorder confirms KCP delivers messages in order even when +// ~20% of packets are arbitrarily held and re-released. videochannel does +// NOT tolerate this (it uses sequence+CRC reassembly that drops on reorder), +// but KCP under vp8channel must. +func TestKCPSurvivesReorder(t *testing.T) { + if testing.Short() { + t.Skip("skipping chaos test in -short mode") + } + msgs := [][]byte{ + bytes.Repeat([]byte("R"), 4000), + bytes.Repeat([]byte("S"), 12000), + bytes.Repeat([]byte("T"), 30000), + } + dur, _ := runChaosLoopback(t, msgs, chaosCfg{ + reorderRatio: 0.20, + reorderHold: 30 * time.Millisecond, + seed: 0xBEEF, + }, 15*time.Second) + t.Logf("reorder-tolerant delivery in %s", dur) +} + +// TestKCPRecoversFromBurstLoss simulates a complete blackout for ~200ms +// then full restoration. This mirrors a real connectivity blip: the +// transport should not give up; KCP should resend everything queued +// during the blackout once the path comes back. +// +//nolint:cyclop // setup + gated pump + assertions naturally branch several ways +func TestKCPRecoversFromBurstLoss(t *testing.T) { + if testing.Short() { + t.Skip("skipping chaos test in -short mode") + } + msgs := [][]byte{ + bytes.Repeat([]byte("X"), 1500), + bytes.Repeat([]byte("Y"), 1500), + bytes.Repeat([]byte("Z"), 1500), + } + + a2b := make(chan []byte, 1024) + b2a := make(chan []byte, 1024) + cb, doneB, getRecv := buildReceiver(len(msgs)) + + rtA, err := startKCP(a2b, nil, testEpochHdr(1)) + if err != nil { + t.Fatalf("startKCP A: %v", err) + } + defer rtA.close() + rtB, err := startKCP(b2a, cb, testEpochHdr(2)) + if err != nil { + t.Fatalf("startKCP B: %v", err) + } + defer rtB.close() + + stop := make(chan struct{}) + defer close(stop) + + var blackout atomic.Bool + gate := func(stop <-chan struct{}, from <-chan []byte, to *kcpRuntime) { + for { + select { + case <-stop: + return + case pkt := <-from: + if blackout.Load() { + continue // drop everything during blackout + } + if len(pkt) > epochHdrLen { + to.deliver(pkt[epochHdrLen:]) + } + } + } + } + go gate(stop, a2b, rtB) + go gate(stop, b2a, rtA) + + // Begin in blackout, send messages, wait, then lift. + blackout.Store(true) + for _, m := range msgs { + if err := rtA.send(m); err != nil { + t.Fatalf("send: %v", err) + } + } + time.Sleep(200 * time.Millisecond) + blackout.Store(false) + + select { + case <-doneB: + case <-time.After(15 * time.Second): + got := getRecv() + t.Fatalf("did not recover from blackout: got %d/%d", len(got), len(msgs)) + } + checkMessages(t, getRecv(), msgs) +} + +// TestKCPThroughputBaseline establishes a perfect-channel throughput floor. +// Not an assertion — if this number regresses meaningfully on the same +// hardware, something changed in KCP options (window size, MTU, tick). +func TestKCPThroughputBaseline(t *testing.T) { + if testing.Short() { + t.Skip("skipping throughput baseline in -short mode") + } + const payloadSize = 8000 + const messages = 50 + msgs := make([][]byte, messages) + for i := range msgs { + msgs[i] = bytes.Repeat([]byte{byte('A' + (i % 26))}, payloadSize) + } + dur, _ := runChaosLoopback(t, msgs, chaosCfg{}, 30*time.Second) + total := messages * payloadSize + mbPerSec := float64(total) / dur.Seconds() / (1 << 20) + t.Logf("baseline: %d bytes in %s = %.2f MiB/s", total, dur, mbPerSec) +}