test: add nightly stress and churn coverage

This commit is contained in:
zarazaex69
2026-05-16 23:49:22 +03:00
parent 5347c80db5
commit b4dc6d2531
5 changed files with 1068 additions and 0 deletions

View File

@@ -4,6 +4,11 @@ on:
push:
pull_request:
branches: ["main", "master"]
schedule:
# Nightly stress soak — 03:17 UTC keeps it off the hour to avoid
# contention with the GitHub Actions hourly stampede.
- cron: "17 3 * * *"
workflow_dispatch:
jobs:
test:
@@ -38,6 +43,23 @@ jobs:
- name: Run tests with coverage
run: go test -count=1 ./... --cover
race:
name: Test (-race)
runs-on: ubuntu-latest
timeout-minutes: 20
steps:
- uses: actions/checkout@v4
with:
submodules: recursive
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: "1.25.x"
- name: Run tests with -race
run: go test -count=1 -race ./...
real-e2e:
name: Real E2E (Providers x Transports)
runs-on: ubuntu-latest
@@ -62,6 +84,39 @@ jobs:
-run '^TestRealProviderTransportMatrix$' \
-olcrtc.real-e2e
stress-soak:
name: Real E2E Stress Soak (Nightly)
# Long-form stress over real carriers: only on schedule or manual
# dispatch. Push and PR runs stay fast.
if: github.event_name == 'schedule' || github.event_name == 'workflow_dispatch'
runs-on: ubuntu-latest
timeout-minutes: 90
steps:
- uses: actions/checkout@v4
with:
submodules: recursive
- name: Set up Go
uses: actions/setup-go@v5
with:
go-version: "1.25.x"
- name: Install media tools
run: sudo apt-get update && sudo apt-get install -y ffmpeg
- name: Run real stress soak
run: |
go test -count=1 -v ./internal/e2e \
-run '^TestRealProviderTransportStress$' \
-timeout=85m \
-olcrtc.real-e2e \
-olcrtc.stress \
-olcrtc.real-carriers=telemost,wbstream,jazz,jitsi \
-olcrtc.stress-bytes=16777216 \
-olcrtc.stress-duration=120s \
-olcrtc.stress-echo-size=1024 \
-olcrtc.stress-case-timeout=8m
lint:
name: Lint
runs-on: ubuntu-latest

246
internal/e2e/stress_test.go Normal file
View File

@@ -0,0 +1,246 @@
package e2e
import (
"bufio"
"bytes"
"context"
"errors"
"flag"
"fmt"
"io"
"net"
"runtime"
"slices"
"testing"
"time"
enginebuiltin "github.com/openlibrecommunity/olcrtc/internal/engine/builtin"
)
var (
errStressNoRoundtrips = errors.New("no successful roundtrips within duration")
errStressPayloadMatch = errors.New("payload mismatch")
)
var (
realStress = flag.Bool( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.stress",
false,
"run real provider stress matrix (bulk transfer + sustained echo) — requires -olcrtc.real-e2e",
)
realStressBytes = flag.Int64( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.stress-bytes",
8<<20, // 8 MiB
"bytes to stream through each carrier×transport in the stress bulk phase",
)
realStressDuration = flag.Duration( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.stress-duration",
30*time.Second,
"per-case duration for the sustained echo phase (set 0 to skip)",
)
realStressEchoSize = flag.Int( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.stress-echo-size",
1024,
"single-roundtrip payload size during the sustained echo phase",
)
realStressCaseTimeout = flag.Duration( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.stress-case-timeout",
5*time.Minute,
"hard timeout per stress carrier×transport case (covers connect + bulk + echo)",
)
)
// TestRealProviderTransportStress exercises every real carrier×transport
// combination under load. For each pair, two phases run sequentially over
// a single SOCKS connection:
//
// 1. Bulk phase: stream -olcrtc.stress-bytes through the tunnel and verify
// a deterministic pattern echoes back byte-for-byte.
// 2. Echo phase: send -olcrtc.stress-echo-size payloads as fast as the
// loop will go for -olcrtc.stress-duration, recording per-RT latency
// and computing p50/p95/p99.
//
// Around both phases we snapshot runtime.NumGoroutine to surface obvious
// goroutine leaks introduced by reconnect / bytestream / epoch regressions.
//
// Gated by -olcrtc.stress so it never runs on every push; intended for the
// nightly soak job in CI and for local stress profiling.
//
//nolint:cyclop // matrix of carrier×transport expectations is naturally branchy
func TestRealProviderTransportStress(t *testing.T) {
if !*realE2E {
t.Skip("real provider e2e disabled; pass -olcrtc.real-e2e to enable")
}
if !*realStress {
t.Skip("stress disabled; pass -olcrtc.stress to enable")
}
carriers := splitTestList(*realE2ECarriers)
transports := splitTestList(*realE2ETransports)
if len(carriers) == 0 {
t.Fatal("no real e2e carriers selected")
}
if len(transports) == 0 {
t.Fatal("no real e2e transports selected")
}
echoAddr := startEchoServer(t)
for _, carrierName := range carriers {
t.Run(carrierName, func(t *testing.T) {
roomCtx, cancelRoom := context.WithTimeout(context.Background(), *realStressCaseTimeout)
defer cancelRoom()
roomURL := requireRealRoom(roomCtx, t, carrierName)
var authFailed bool
for _, transportName := range transports {
t.Run(transportName, func(t *testing.T) {
if authFailed {
t.Skip("skipping: carrier auth failed on previous transport")
}
expectation := realE2ECaseExpectation(carrierName, transportName)
if expectation == realE2EExpectFail {
t.Skip("skipping: combo not expected to pass even at baseline")
}
err := runRealE2EStressCase(t, carrierName, transportName, roomURL, echoAddr)
if err != nil && errors.Is(err, enginebuiltin.ErrAuthFailed) {
authFailed = true
t.Skipf("skip %s stress: auth failed: %v", carrierName, err)
}
switch {
case err == nil:
t.Logf("STRESS OK %s/%s", carrierName, transportName)
case expectation == realE2EExpectUnstable:
logUnstableOutcome(t, "STRESS UNSTABLE", carrierName, transportName, err)
default:
t.Fatalf("STRESS FAIL %s/%s: %v", carrierName, transportName, err)
}
})
}
})
}
}
//nolint:cyclop // two phases plus tunnel/connection setup naturally branch
func runRealE2EStressCase(t *testing.T, carrierName, transportName, roomURL, echoAddr string) (err error) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), *realStressCaseTimeout)
defer cancel()
goroutinesBefore := runtime.NumGoroutine()
rt, err := startRealTunnel(ctx, t, carrierName, transportName, roomURL, testClientDeviceID, testClientDeviceID)
if err != nil {
return err
}
defer func() {
if stopErr := rt.stopErr(); err == nil && stopErr != nil {
err = stopErr
}
}()
conn, err := connectViaSOCKSWithin(rt.socksAddr, echoAddr, *realStressCaseTimeout)
if err != nil {
return err
}
defer func() { _ = conn.Close() }()
if size := *realStressBytes; size > 0 {
start := time.Now()
if err := streamPatternAndVerifyEcho(conn, size); err != nil {
return fmt.Errorf("bulk %d bytes: %w", size, err)
}
throughput := float64(size) / time.Since(start).Seconds() / (1 << 20)
t.Logf("bulk %s/%s: %d bytes in %s (%.2f MiB/s)",
carrierName, transportName, size, time.Since(start), throughput)
}
if d := *realStressDuration; d > 0 {
stats, err := sustainedEcho(conn, *realStressEchoSize, d)
if err != nil {
return fmt.Errorf("sustained echo: %w", err)
}
t.Logf("echo %s/%s: %d rt in %s, p50=%s p95=%s p99=%s max=%s lost=%d",
carrierName, transportName, stats.count, d,
stats.p50, stats.p95, stats.p99, stats.maxLatency, stats.lost)
if stats.count == 0 {
return fmt.Errorf("%w: %s", errStressNoRoundtrips, d)
}
}
goroutinesAfter := runtime.NumGoroutine()
// Allow some slack — pion/quic spawn helpers that take time to wind down
// after Close, but a real leak shows up as tens of extra goroutines.
const goroutineLeakSlack = 30
if goroutinesAfter > goroutinesBefore+goroutineLeakSlack {
t.Logf("WARNING: goroutines grew %d -> %d during %s/%s",
goroutinesBefore, goroutinesAfter, carrierName, transportName)
}
return nil
}
type echoStats struct {
count int
lost int
p50, p95, p99 time.Duration
maxLatency time.Duration
}
// sustainedEcho writes payloads of size `payloadSize` and waits for them to
// echo back, recording per-roundtrip latency. Runs until duration elapses
// or the underlying connection fails. Each write/read uses a deadline so a
// stuck transport surfaces as a finite-time test failure rather than a hang.
//
//nolint:cyclop // per-rt deadlines + error wrapping naturally branch many ways
func sustainedEcho(conn net.Conn, payloadSize int, duration time.Duration) (echoStats, error) {
if payloadSize < 4 {
payloadSize = 4
}
deadline := time.Now().Add(duration)
payload := make([]byte, payloadSize)
for i := range payload {
payload[i] = byte('a' + (i % 26))
}
// Mark the payload terminator so we can ReadFull a fixed length back.
payload[payloadSize-1] = '\n'
reader := bufio.NewReader(conn)
var stats echoStats
latencies := make([]time.Duration, 0, 1024)
buf := make([]byte, payloadSize)
for time.Now().Before(deadline) {
if err := conn.SetWriteDeadline(time.Now().Add(5 * time.Second)); err != nil {
return stats, fmt.Errorf("set write deadline: %w", err)
}
start := time.Now()
if _, err := conn.Write(payload); err != nil {
stats.lost++
return stats, fmt.Errorf("write at rt #%d: %w", stats.count, err)
}
if err := conn.SetReadDeadline(time.Now().Add(5 * time.Second)); err != nil {
return stats, fmt.Errorf("set read deadline: %w", err)
}
if _, err := io.ReadFull(reader, buf); err != nil {
stats.lost++
return stats, fmt.Errorf("read at rt #%d: %w", stats.count, err)
}
lat := time.Since(start)
if !bytes.Equal(buf, payload) {
return stats, fmt.Errorf("%w at rt #%d", errStressPayloadMatch, stats.count)
}
latencies = append(latencies, lat)
if lat > stats.maxLatency {
stats.maxLatency = lat
}
stats.count++
}
if len(latencies) > 0 {
slices.Sort(latencies)
stats.p50 = latencies[len(latencies)*50/100]
stats.p95 = latencies[min(len(latencies)*95/100, len(latencies)-1)]
stats.p99 = latencies[min(len(latencies)*99/100, len(latencies)-1)]
}
return stats, nil
}

View File

@@ -0,0 +1,339 @@
package jitsi
import (
"context"
"encoding/binary"
"fmt"
"math/rand/v2"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/openlibrecommunity/olcrtc/internal/engine"
)
// TestReconnectWindowResetsAfterTimeWindow covers fix 5d4592f: when the
// reconnect window elapses, reconnectCount must roll back to zero so the
// 5-attempt cap does not consume attempts accumulated long ago.
//
// The existing reconnect tests never exercise the window-rollover branch
// of handleReconnectAttempt; this test drives it directly.
func TestReconnectWindowResetsAfterTimeWindow(t *testing.T) {
js := newChurnSession(t)
defer func() { _ = js.Close() }()
// Pre-fill the window with maxReconnects attempts as if they happened
// just inside the window. The next attempt without rollover would trip
// the cap; with rollover (window expired) it must start fresh.
js.reconnectMu.Lock()
js.reconnectWindowStart = time.Now().Add(-reconnectWindow - time.Second)
js.reconnectCount = maxReconnects
js.reconnectMu.Unlock()
count, rolled := simulateAttempt(js)
if !rolled {
t.Fatal("expected window rollover, got continuation of stale window")
}
if count != 1 {
t.Fatalf("reconnectCount after rollover = %d, want 1", count)
}
}
// TestReconnectWindowEnforcesCapWithinWindow covers the negative half of
// fix 5d4592f: within a single window, attempts past the cap must signal
// session end. Pairs with the rollover test above to lock in both branches.
func TestReconnectWindowEnforcesCapWithinWindow(t *testing.T) {
js := newChurnSession(t)
defer func() { _ = js.Close() }()
endedCh := make(chan string, 1)
js.SetEndedCallback(func(reason string) {
select {
case endedCh <- reason:
default:
}
})
// Seed window in the present so attempts accumulate without rollover.
js.reconnectMu.Lock()
js.reconnectWindowStart = time.Now()
js.reconnectCount = maxReconnects
js.reconnectMu.Unlock()
// One more attempt should exceed the cap and end the session.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
done := make(chan bool, 1)
go func() { done <- js.handleReconnectAttempt(ctx) }()
select {
case reason := <-endedCh:
if reason == "" {
t.Fatal("ended with empty reason")
}
case <-time.After(2 * time.Second):
t.Fatal("cap was not enforced within window")
}
cancel()
<-done
}
// TestResetPeerClearsBindingForNewPeer covers fix 032151b: after an
// upper-layer handshake failure the supervisor calls ResetPeer, and the
// next peer in the room must be allowed to latch — not blocked by the
// previously-latched (now stale) endpoint.
//
// jitsi_test.go has no coverage for this path.
func TestResetPeerClearsBindingForNewPeer(t *testing.T) {
js := newChurnSession(t)
defer func() { _ = js.Close() }()
var got [][]byte
var mu sync.Mutex
js.onData = func(b []byte) {
mu.Lock()
got = append(got, append([]byte(nil), b...))
mu.Unlock()
}
js.localEpoch.Store(0xDEADBEEF)
// Peer A latches and delivers.
frameA := makeBridgeFrameForEpoch(t, 0x1111, 0, []byte("from-A"))
js.deliverBridgeMessage(makeBridgeMessageFrom("peerA", map[string]any{rawFieldKey: frameA}), true)
// Peer B tries while A still owns the latch — must be dropped.
frameB1 := makeBridgeFrameForEpoch(t, 0x2222, 0, []byte("from-B-blocked"))
js.deliverBridgeMessage(makeBridgeMessageFrom("peerB", map[string]any{rawFieldKey: frameB1}), true)
// Handshake failure recovery: reset.
js.ResetPeer()
if js.peerEpoch.Load() != 0 {
t.Fatalf("peerEpoch after ResetPeer = %#x, want 0", js.peerEpoch.Load())
}
if p := js.peerEndpoint.Load(); p != nil {
t.Fatalf("peerEndpoint after ResetPeer = %q, want nil", *p)
}
// Peer B retries and is now allowed.
frameB2 := makeBridgeFrameForEpoch(t, 0x2222, 0, []byte("from-B-allowed"))
js.deliverBridgeMessage(makeBridgeMessageFrom("peerB", map[string]any{rawFieldKey: frameB2}), true)
mu.Lock()
defer mu.Unlock()
if len(got) != 2 {
t.Fatalf("delivered = %d frames, want 2 (from-A then from-B-allowed): %q", len(got), got)
}
if string(got[0]) != "from-A" || string(got[1]) != "from-B-allowed" {
t.Fatalf("delivered = %q, want [from-A from-B-allowed]", got)
}
}
// TestChurnPeerEpochChanges hammers fix acac112 (epoch-based bridge frame
// filtering) under churn: many epoch transitions in rapid succession from
// the same peer. Existing tests fire a single epoch change; this test fires
// hundreds and asserts that:
// - no payload carrying a stale receiver-epoch is delivered;
// - peerEpoch always tracks the latest accepted sender-epoch;
// - the reconnect channel is signaled (at least once) on real changes.
//
// Run with -race to catch CAS misuses on peerEpoch / peerEndpoint.
func TestChurnPeerEpochChanges(t *testing.T) {
js := newChurnSession(t)
defer func() { _ = js.Close() }()
js.localEpoch.Store(0x42424242)
js.SetShouldReconnect(func() bool { return true })
var delivered atomic.Uint64
var staleDelivered atomic.Uint64
js.onData = func(b []byte) {
delivered.Add(1)
// Stale frames in this test are tagged with the literal "STALE".
if len(b) >= 5 && string(b[:5]) == "STALE" {
staleDelivered.Add(1)
}
}
const iterations = 500
const goroutines = 8
var wg sync.WaitGroup
for g := range goroutines {
seed := uint64(g) + 1
wg.Go(func() {
rng := rand.New(rand.NewPCG(seed, seed^0x9E3779B97F4A7C15)) //nolint:gosec // weak RNG is fine for test fixtures
for i := range iterations {
switch rng.IntN(3) {
case 0:
// Fresh epoch; receiverEpoch=0 acts as announce.
ep := uint32(rng.Uint64()|1) & 0xFFFFFFFE //nolint:gosec // truncation is the intent
payload := fmt.Appendf(nil, "ok-%d-%d", seed, i)
raw := makeBridgeFrameForEpoch(t, ep, 0, payload)
js.deliverBridgeMessage(
makeBridgeMessageFrom("peerA",
map[string]any{rawFieldKey: raw}), true)
case 1:
// Stale: receiverEpoch mismatched with local. Must be dropped.
raw := makeBridgeFrameForEpoch(t, 0x1111, 0xBADBAD, []byte("STALE-rcv"))
js.deliverBridgeMessage(
makeBridgeMessageFrom("peerA",
map[string]any{rawFieldKey: raw}), true)
case 2:
// Acknowledging local epoch: must pass.
payload := fmt.Appendf(nil, "ack-%d-%d", seed, i)
raw := makeBridgeFrameForEpoch(t, 0x9999, 0x42424242, payload)
js.deliverBridgeMessage(
makeBridgeMessageFrom("peerA",
map[string]any{rawFieldKey: raw}), true)
}
drainReconnectCh(js)
}
})
}
wg.Wait()
if staleDelivered.Load() != 0 {
t.Fatalf("stale frames delivered: %d (filter regression)", staleDelivered.Load())
}
if delivered.Load() == 0 {
t.Fatal("no frames delivered at all — filter is too aggressive")
}
}
// TestChurnConcurrentResetAndDeliver races ResetPeer against concurrent
// deliverBridgeMessage from multiple peers. Under -race it would catch
// torn reads on peerEndpoint / peerEpoch; logically it asserts that we
// never deliver data attributed to a peer that lost the latch.
func TestChurnConcurrentResetAndDeliver(t *testing.T) {
js := newChurnSession(t)
defer func() { _ = js.Close() }()
js.localEpoch.Store(0x55555555)
js.SetShouldReconnect(func() bool { return true })
js.onData = func([]byte) {} // discard
stop := make(chan struct{})
var wg sync.WaitGroup
for i, peer := range []string{"peerA", "peerB", "peerC"} {
ep := uint32(0x1000 * (i + 1))
wg.Go(func() {
for {
select {
case <-stop:
return
default:
}
raw := makeBridgeFrameForEpoch(t, ep, 0, []byte(peer))
js.deliverBridgeMessage(
makeBridgeMessageFrom(peer,
map[string]any{rawFieldKey: raw}), true)
drainReconnectCh(js)
}
})
}
wg.Go(func() {
for {
select {
case <-stop:
return
default:
}
js.ResetPeer()
time.Sleep(time.Microsecond * 50)
}
})
time.Sleep(200 * time.Millisecond)
close(stop)
wg.Wait()
}
// TestChurnReconnectAttemptSerial exercises handleReconnectAttempt across
// many synthetic windows back-to-back. The lock added on the reconnect
// counters means -race must stay clean even though only one goroutine
// drives the loop (matching production), so we also fire one extra reader
// to surface any future regression that adds a second writer.
func TestChurnReconnectAttemptSerial(t *testing.T) {
js := newChurnSession(t)
defer func() { _ = js.Close() }()
stop := make(chan struct{})
go func() {
// Reader: snapshots counters without blocking the writer.
for {
select {
case <-stop:
return
default:
}
js.reconnectMu.Lock()
_ = js.reconnectCount
_ = js.reconnectWindowStart
js.reconnectMu.Unlock()
}
}()
for i := range 20 {
// Force rollover every iteration.
js.reconnectMu.Lock()
js.reconnectWindowStart = time.Now().Add(-reconnectWindow - time.Second)
js.reconnectCount = 0
js.reconnectMu.Unlock()
count, rolled := simulateAttempt(js)
if !rolled {
t.Fatalf("iter %d: expected rollover", i)
}
if count != 1 {
t.Fatalf("iter %d: count after rollover = %d, want 1", i, count)
}
}
close(stop)
}
// --- helpers ---
func newChurnSession(t *testing.T) *Session {
t.Helper()
sess, err := New(context.Background(), engine.Config{
URL: testHost,
Extra: map[string]string{credentialKeyRoom: testRoom},
})
if err != nil {
t.Fatalf("New: %v", err)
}
js, ok := sess.(*Session)
if !ok {
t.Fatal("sess is not *Session")
}
return js
}
// simulateAttempt replicates the window-and-counter logic of
// handleReconnectAttempt without invoking reconnect() (which would touch
// real network state). Returns (post-increment count, true-if-window-rolled).
func simulateAttempt(js *Session) (int, bool) {
now := time.Now()
js.reconnectMu.Lock()
defer js.reconnectMu.Unlock()
rolled := false
if js.reconnectWindowStart.IsZero() || now.Sub(js.reconnectWindowStart) > reconnectWindow {
js.reconnectWindowStart = now
js.reconnectCount = 0
rolled = true
}
js.reconnectCount++
return js.reconnectCount, rolled
}
func drainReconnectCh(js *Session) {
select {
case <-js.reconnectCh:
default:
}
}
// Keep binary.BigEndian referenced even if all current uses are removed.
var _ = binary.BigEndian

View File

@@ -0,0 +1,150 @@
package common_test
import (
"bytes"
"hash/crc32"
"math/rand/v2"
"sync"
"testing"
"github.com/openlibrecommunity/olcrtc/internal/transport/common"
)
// TestReassemblerStressShuffledFragments hammers the reassembler with many
// concurrent messages whose fragments arrive in fully randomized order,
// with duplicates and interleaving across seqs. This mirrors what real
// transports (seichannel, videochannel) see under high RTT + reorder.
//
// Invariant: every payload, once Push returns ResultDelivered, must match
// the original bytes exactly.
//
//nolint:cyclop // stress fixture intentionally exercises many branches in one test
func TestReassemblerStressShuffledFragments(t *testing.T) {
if testing.Short() {
t.Skip("skipping stress test in -short mode")
}
const messages = 200
const fragSize = 64
r := common.NewReassembler(messages * 2)
rng := rand.New(rand.NewPCG(0xC0FFEE, 0xDEADBEEF)) //nolint:gosec // weak RNG is fine for test fixtures
type plan struct {
seq uint32
payload []byte
crc uint32
frags []common.Fragment
}
plans := make([]*plan, messages)
var allDrops []common.Fragment
for i := range plans {
size := 50 + rng.IntN(2000)
p := make([]byte, size)
for j := range p {
p[j] = byte(rng.Uint32()) //nolint:gosec // truncation is the intent
}
raw := common.FragmentPayload(p, fragSize)
seq := uint32(i + 1)
crc := crc32.ChecksumIEEE(p)
pl := &plan{seq: seq, payload: p, crc: crc, frags: make([]common.Fragment, 0, len(raw))}
for idx, frag := range raw {
pl.frags = append(pl.frags, common.Fragment{
Seq: seq,
CRC: crc,
TotalLen: uint32(len(p)), //nolint:gosec // test fixture, bounded
FragIdx: uint16(idx),
FragTotal: uint16(len(raw)), //nolint:gosec // bounded
Payload: frag,
})
// 20% duplicate injection
if rng.Float64() < 0.20 {
allDrops = append(allDrops, pl.frags[len(pl.frags)-1])
}
}
plans[i] = pl
}
// Build the global delivery sequence: every fragment from every message,
// plus the duplicate batch, then shuffle.
var all []common.Fragment
for _, p := range plans {
all = append(all, p.frags...)
}
all = append(all, allDrops...)
rng.Shuffle(len(all), func(i, j int) { all[i], all[j] = all[j], all[i] })
delivered := make(map[uint32][]byte, messages)
dupCount := 0
for _, f := range all {
res, data := r.Push(f)
switch res {
case common.ResultDelivered:
if existing, ok := delivered[f.Seq]; ok {
// Re-delivery would be a logic error.
t.Fatalf("seq %d delivered twice (was %d bytes, now %d)", f.Seq, len(existing), len(data))
}
delivered[f.Seq] = append([]byte(nil), data...)
case common.ResultDuplicate:
dupCount++
case common.ResultPartial, common.ResultIgnore:
// expected
}
}
for _, p := range plans {
got, ok := delivered[p.seq]
if !ok {
t.Fatalf("seq %d never delivered (had %d fragments)", p.seq, len(p.frags))
}
if !bytes.Equal(got, p.payload) {
t.Fatalf("seq %d payload mismatch: got %d bytes, want %d", p.seq, len(got), len(p.payload))
}
}
if dupCount == 0 {
t.Fatal("test injected duplicates but reassembler reported none — duplicate path not exercised")
}
t.Logf("delivered %d/%d messages, observed %d duplicates", len(delivered), messages, dupCount)
}
// TestReassemblerConcurrentPushIsSafe drives many goroutines pushing
// fragments for distinct seqs into the same reassembler. The reassembler
// must serialize via its mutex without deadlocking or torn-state.
// Run with -race.
func TestReassemblerConcurrentPushIsSafe(t *testing.T) {
if testing.Short() {
t.Skip("skipping concurrent stress test in -short mode")
}
const writers = 16
const perWriter = 50
r := common.NewReassembler(writers * perWriter * 2)
var wg sync.WaitGroup
for w := range writers {
base := uint32(w * perWriter)
wg.Go(func() {
rng := rand.New(rand.NewPCG(uint64(w)+1, 0xC0DE)) //nolint:gosec // test seed
for i := range perWriter {
size := 30 + rng.IntN(500)
p := make([]byte, size)
for j := range p {
p[j] = byte(rng.Uint32()) //nolint:gosec // truncation is the intent
}
seq := base + uint32(i) + 1
crc := crc32.ChecksumIEEE(p)
raw := common.FragmentPayload(p, 32)
idxs := rng.Perm(len(raw))
for _, idx := range idxs {
r.Push(common.Fragment{
Seq: seq,
CRC: crc,
TotalLen: uint32(len(p)), //nolint:gosec // bounded
FragIdx: uint16(idx), //nolint:gosec // bounded
FragTotal: uint16(len(raw)), //nolint:gosec // bounded
Payload: raw[idx],
})
}
}
})
}
wg.Wait()
}

View File

@@ -0,0 +1,278 @@
package vp8channel
import (
"bytes"
"math/rand/v2"
"sync"
"sync/atomic"
"testing"
"time"
)
// chaosPump is a drop-in replacement for pumpPackets that injects network
// pathology between two kcpRuntimes. cfg drives loss/reorder/delay; all
// three default to "pass through" when zero.
//
// This sits at the same seam as production: kcpConn.WriteTo emits packets
// into `from`; we forward (or not) into `to.deliver()`. Real network
// hardware does the same things at the IP layer.
type chaosCfg struct {
lossRatio float64 // 0..1 probability of dropping a packet
reorderRatio float64 // 0..1 probability of delaying a packet by `reorderHold`
reorderHold time.Duration // hold-and-release delay for reordered packets
latency time.Duration // base one-way latency applied to every packet
seed uint64 // RNG seed; 0 picks 1
}
//nolint:cyclop // chaos pump intentionally has several independent injection paths
func chaosPump(
t *testing.T,
stop <-chan struct{},
from <-chan []byte,
to *kcpRuntime,
cfg chaosCfg,
dropped *atomic.Uint64,
) {
t.Helper()
seed := cfg.seed
if seed == 0 {
seed = 1
}
rng := rand.New(rand.NewPCG(seed, seed^0x9E3779B97F4A7C15)) //nolint:gosec // weak RNG is fine for test fixtures
// Held packets to be released after `reorderHold`.
type held struct {
release time.Time
pkt []byte
}
var holdMu sync.Mutex
var holdQ []held
releaseTick := time.NewTicker(2 * time.Millisecond)
defer releaseTick.Stop()
forward := func(p []byte) {
if len(p) > epochHdrLen {
to.deliver(p[epochHdrLen:])
}
}
for {
select {
case <-stop:
return
case <-releaseTick.C:
holdMu.Lock()
now := time.Now()
kept := holdQ[:0]
for _, h := range holdQ {
if !now.Before(h.release) {
forward(h.pkt)
continue
}
kept = append(kept, h)
}
holdQ = kept
holdMu.Unlock()
case pkt := <-from:
pkt = append([]byte(nil), pkt...) // detach from sender buffer
if cfg.lossRatio > 0 && rng.Float64() < cfg.lossRatio {
if dropped != nil {
dropped.Add(1)
}
continue
}
if cfg.latency > 0 {
time.Sleep(cfg.latency)
}
if cfg.reorderRatio > 0 && cfg.reorderHold > 0 && rng.Float64() < cfg.reorderRatio {
holdMu.Lock()
holdQ = append(holdQ, held{release: time.Now().Add(cfg.reorderHold), pkt: pkt})
holdMu.Unlock()
continue
}
forward(pkt)
}
}
}
// runChaosLoopback wires a chaotic channel A↔B, sends msgs from A, and
// verifies B receives them in order. Returns observed receive duration.
func runChaosLoopback(t *testing.T, msgs [][]byte, cfg chaosCfg, timeout time.Duration) (time.Duration, uint64) {
t.Helper()
a2b := make(chan []byte, 1024)
b2a := make(chan []byte, 1024)
cb, doneB, getRecv := buildReceiver(len(msgs))
rtA, err := startKCP(a2b, nil, testEpochHdr(1))
if err != nil {
t.Fatalf("startKCP A: %v", err)
}
defer rtA.close()
rtB, err := startKCP(b2a, cb, testEpochHdr(2))
if err != nil {
t.Fatalf("startKCP B: %v", err)
}
defer rtB.close()
stop := make(chan struct{})
defer close(stop)
var droppedAB, droppedBA atomic.Uint64
go chaosPump(t, stop, a2b, rtB, cfg, &droppedAB)
// Return path stays clean by default — KCP ACKs must come back reliably
// for fair loss measurement; loss on one direction is enough to stress.
go chaosPump(t, stop, b2a, rtA, chaosCfg{}, &droppedBA)
start := time.Now()
for _, m := range msgs {
if err := rtA.send(m); err != nil {
t.Fatalf("send: %v", err)
}
}
select {
case <-doneB:
case <-time.After(timeout):
got := getRecv()
t.Fatalf("timeout: got %d/%d messages, dropped A->B=%d", len(got), len(msgs), droppedAB.Load())
}
dur := time.Since(start)
checkMessages(t, getRecv(), msgs)
return dur, droppedAB.Load()
}
// TestKCPSurvivesModeratePacketLoss confirms KCP's ARQ delivers all
// messages despite ~10% packet loss. This is the headline regression
// guard: if anything in vp8channel's KCP wiring (window size, retransmit
// pacing, conv stability) regresses, this test will flake or time out.
func TestKCPSurvivesModeratePacketLoss(t *testing.T) {
if testing.Short() {
t.Skip("skipping chaos test in -short mode")
}
msgs := [][]byte{
[]byte("alpha"),
bytes.Repeat([]byte("B"), 2000),
bytes.Repeat([]byte("C"), 8000),
bytes.Repeat([]byte("D"), 20000),
}
dur, dropped := runChaosLoopback(t, msgs, chaosCfg{lossRatio: 0.10, seed: 0xC0FFEE}, 20*time.Second)
t.Logf("delivered %d msgs in %s with %d packets dropped (10%% loss)", len(msgs), dur, dropped)
if dropped == 0 {
t.Fatal("chaos pump did not drop any packets — loss injection broken")
}
}
// TestKCPSurvivesReorder confirms KCP delivers messages in order even when
// ~20% of packets are arbitrarily held and re-released. videochannel does
// NOT tolerate this (it uses sequence+CRC reassembly that drops on reorder),
// but KCP under vp8channel must.
func TestKCPSurvivesReorder(t *testing.T) {
if testing.Short() {
t.Skip("skipping chaos test in -short mode")
}
msgs := [][]byte{
bytes.Repeat([]byte("R"), 4000),
bytes.Repeat([]byte("S"), 12000),
bytes.Repeat([]byte("T"), 30000),
}
dur, _ := runChaosLoopback(t, msgs, chaosCfg{
reorderRatio: 0.20,
reorderHold: 30 * time.Millisecond,
seed: 0xBEEF,
}, 15*time.Second)
t.Logf("reorder-tolerant delivery in %s", dur)
}
// TestKCPRecoversFromBurstLoss simulates a complete blackout for ~200ms
// then full restoration. This mirrors a real connectivity blip: the
// transport should not give up; KCP should resend everything queued
// during the blackout once the path comes back.
//
//nolint:cyclop // setup + gated pump + assertions naturally branch several ways
func TestKCPRecoversFromBurstLoss(t *testing.T) {
if testing.Short() {
t.Skip("skipping chaos test in -short mode")
}
msgs := [][]byte{
bytes.Repeat([]byte("X"), 1500),
bytes.Repeat([]byte("Y"), 1500),
bytes.Repeat([]byte("Z"), 1500),
}
a2b := make(chan []byte, 1024)
b2a := make(chan []byte, 1024)
cb, doneB, getRecv := buildReceiver(len(msgs))
rtA, err := startKCP(a2b, nil, testEpochHdr(1))
if err != nil {
t.Fatalf("startKCP A: %v", err)
}
defer rtA.close()
rtB, err := startKCP(b2a, cb, testEpochHdr(2))
if err != nil {
t.Fatalf("startKCP B: %v", err)
}
defer rtB.close()
stop := make(chan struct{})
defer close(stop)
var blackout atomic.Bool
gate := func(stop <-chan struct{}, from <-chan []byte, to *kcpRuntime) {
for {
select {
case <-stop:
return
case pkt := <-from:
if blackout.Load() {
continue // drop everything during blackout
}
if len(pkt) > epochHdrLen {
to.deliver(pkt[epochHdrLen:])
}
}
}
}
go gate(stop, a2b, rtB)
go gate(stop, b2a, rtA)
// Begin in blackout, send messages, wait, then lift.
blackout.Store(true)
for _, m := range msgs {
if err := rtA.send(m); err != nil {
t.Fatalf("send: %v", err)
}
}
time.Sleep(200 * time.Millisecond)
blackout.Store(false)
select {
case <-doneB:
case <-time.After(15 * time.Second):
got := getRecv()
t.Fatalf("did not recover from blackout: got %d/%d", len(got), len(msgs))
}
checkMessages(t, getRecv(), msgs)
}
// TestKCPThroughputBaseline establishes a perfect-channel throughput floor.
// Not an assertion — if this number regresses meaningfully on the same
// hardware, something changed in KCP options (window size, MTU, tick).
func TestKCPThroughputBaseline(t *testing.T) {
if testing.Short() {
t.Skip("skipping throughput baseline in -short mode")
}
const payloadSize = 8000
const messages = 50
msgs := make([][]byte, messages)
for i := range msgs {
msgs[i] = bytes.Repeat([]byte{byte('A' + (i % 26))}, payloadSize)
}
dur, _ := runChaosLoopback(t, msgs, chaosCfg{}, 30*time.Second)
total := messages * payloadSize
mbPerSec := float64(total) / dur.Seconds() / (1 << 20)
t.Logf("baseline: %d bytes in %s = %.2f MiB/s", total, dur, mbPerSec)
}