From 7657b3c7b216245814d5c8adf2ec1fbbb061d310 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Sun, 17 May 2026 00:28:37 +0300 Subject: [PATCH] test(e2e): time-box stress bulk phase by duration --- .github/workflows/ci.yml | 2 +- internal/e2e/stress_test.go | 133 ++++++++++++++++++++++++++++++++---- 2 files changed, 119 insertions(+), 16 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 7b08e2a..9e026e5 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -112,7 +112,7 @@ jobs: -olcrtc.real-e2e \ -olcrtc.stress \ -olcrtc.real-carriers=telemost,wbstream,jazz,jitsi \ - -olcrtc.stress-bytes=16777216 \ + -olcrtc.stress-bulk-duration=90s \ -olcrtc.stress-duration=120s \ -olcrtc.stress-echo-size=1024 \ -olcrtc.stress-case-timeout=8m diff --git a/internal/e2e/stress_test.go b/internal/e2e/stress_test.go index 98637c2..f5a2a34 100644 --- a/internal/e2e/stress_test.go +++ b/internal/e2e/stress_test.go @@ -18,8 +18,9 @@ import ( ) var ( - errStressNoRoundtrips = errors.New("no successful roundtrips within duration") - errStressPayloadMatch = errors.New("payload mismatch") + errStressNoRoundtrips = errors.New("no successful roundtrips within duration") + errStressPayloadMatch = errors.New("payload mismatch") + errStressNoBulkProgress = errors.New("bulk pump made zero progress") ) var ( @@ -28,10 +29,13 @@ var ( false, "run real provider stress matrix (bulk transfer + sustained echo) — requires -olcrtc.real-e2e", ) - realStressBytes = flag.Int64( //nolint:gochecknoglobals // package-level state intentional - "olcrtc.stress-bytes", - 8<<20, // 8 MiB - "bytes to stream through each carrier×transport in the stress bulk phase", + realStressBulkDuration = flag.Duration( //nolint:gochecknoglobals // package-level state intentional + "olcrtc.stress-bulk-duration", + 60*time.Second, + "per-case duration for the bulk pattern-pump phase (set 0 to skip). "+ + "Throughput differs by ~3 orders of magnitude across transports "+ + "(datachannel: MiB/s; videochannel: KB/s), so we measure how much "+ + "flows in a fixed time rather than fixing the byte budget.", ) realStressDuration = flag.Duration( //nolint:gochecknoglobals // package-level state intentional "olcrtc.stress-duration", @@ -54,8 +58,11 @@ var ( // combination under load. For each pair, two phases run sequentially over // a single SOCKS connection: // -// 1. Bulk phase: stream -olcrtc.stress-bytes through the tunnel and verify -// a deterministic pattern echoes back byte-for-byte. +// 1. Bulk phase: stream a deterministic byte pattern through the tunnel +// for -olcrtc.stress-bulk-duration and verify it echoes back byte-for- +// byte. Reports observed throughput. Different transports differ by +// orders of magnitude (qr-encoded videochannel vs SCTP datachannel), +// so we measure rather than assert a fixed budget. // 2. Echo phase: send -olcrtc.stress-echo-size payloads as fast as the // loop will go for -olcrtc.stress-duration, recording per-RT latency // and computing p50/p95/p99. @@ -144,14 +151,17 @@ func runRealE2EStressCase(t *testing.T, carrierName, transportName, roomURL, ech } defer func() { _ = conn.Close() }() - if size := *realStressBytes; size > 0 { - start := time.Now() - if err := streamPatternAndVerifyEcho(conn, size); err != nil { - return fmt.Errorf("bulk %d bytes: %w", size, err) + if d := *realStressBulkDuration; d > 0 { + written, dur, err := streamPatternForDuration(conn, d) + if err != nil { + return fmt.Errorf("bulk pump: %w", err) + } + throughput := float64(written) / dur.Seconds() / (1 << 20) + t.Logf("bulk %s/%s: %d bytes in %s (%.3f MiB/s)", + carrierName, transportName, written, dur, throughput) + if written == 0 { + return errStressNoBulkProgress } - throughput := float64(size) / time.Since(start).Seconds() / (1 << 20) - t.Logf("bulk %s/%s: %d bytes in %s (%.2f MiB/s)", - carrierName, transportName, size, time.Since(start), throughput) } if d := *realStressDuration; d > 0 { @@ -179,6 +189,99 @@ func runRealE2EStressCase(t *testing.T, carrierName, transportName, roomURL, ech return nil } +// streamPatternForDuration pumps a deterministic byte pattern through conn +// for at most `duration`, reading the echoed bytes back and verifying they +// match. Returns total bytes successfully echoed and the elapsed time. +// +// Unlike streamPatternAndVerifyEcho (which fixes the size budget upfront), +// this variant respects a wall-clock deadline. That matters because +// transport throughputs differ by orders of magnitude — a fixed byte budget +// either takes seconds (datachannel) or runs past any sane CI timeout +// (videochannel). We stop the writer at the deadline and wait for the +// reader to drain in-flight bytes within a short grace window. +// +//nolint:cyclop,gocognit // two cooperating loops + deadlines naturally branch +func streamPatternForDuration(conn net.Conn, duration time.Duration) (int64, time.Duration, error) { + const chunkSize = 4096 + const drainGrace = 3 * time.Second + + start := time.Now() + writeDeadline := start.Add(duration) + + writeDone := make(chan struct{}) + writeErr := make(chan error, 1) + var writtenTotal int64 + + go func() { + defer close(writeDone) + buf := make([]byte, chunkSize) + var written int64 + for time.Now().Before(writeDeadline) { + fillPattern(buf, written) + if err := conn.SetWriteDeadline(time.Now().Add(5 * time.Second)); err != nil { + writeErr <- fmt.Errorf("set write deadline at %d: %w", written, err) + return + } + n, err := conn.Write(buf) + written += int64(n) + if err != nil { + writeErr <- fmt.Errorf("write at %d: %w", written, err) + return + } + } + writtenTotal = written + writeErr <- nil + }() + + buf := make([]byte, chunkSize) + want := make([]byte, chunkSize) + var read int64 + for { + // Reader stops once it has consumed everything the writer produced + // (within drainGrace after writer finishes). + select { + case <-writeDone: + if read >= writtenTotal && writtenTotal > 0 { + if err := <-writeErr; err != nil { + return read, time.Since(start), err + } + return read, time.Since(start), nil + } + default: + } + + readDeadline := time.Now().Add(5 * time.Second) + if !time.Now().Before(writeDeadline) { + // Writer phase done; allow a short grace window for the tunnel + // to drain bytes already in flight. + readDeadline = time.Now().Add(drainGrace) + } + if err := conn.SetReadDeadline(readDeadline); err != nil { + return read, time.Since(start), fmt.Errorf("set read deadline: %w", err) + } + n, err := io.ReadFull(conn, buf) + if err != nil { + // Reader timed out after writer is done — that's clean drain end. + select { + case <-writeDone: + if read > 0 { + if werr := <-writeErr; werr != nil { + return read, time.Since(start), werr + } + return read, time.Since(start), nil + } + default: + } + return read, time.Since(start), fmt.Errorf("read at %d: %w", read, err) + } + fillPattern(want[:n], read) + if !bytes.Equal(buf[:n], want[:n]) { + return read, time.Since(start), fmt.Errorf("%w %d", errPayloadMismatchOffset, read) + } + read += int64(n) + } +} + type echoStats struct { count int lost int