From 33cccbc906fcd96c28160b07837b95e8b5fffe72 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Sun, 17 May 2026 05:07:00 +0300 Subject: [PATCH] fix(e2e): pace stress bulk echo by chunk roundtrip --- internal/e2e/stress_test.go | 126 ++++++++++++++---------------------- 1 file changed, 47 insertions(+), 79 deletions(-) diff --git a/internal/e2e/stress_test.go b/internal/e2e/stress_test.go index f5a2a34..3d59f7e 100644 --- a/internal/e2e/stress_test.go +++ b/internal/e2e/stress_test.go @@ -52,6 +52,11 @@ var ( 5*time.Minute, "hard timeout per stress carrier×transport case (covers connect + bulk + echo)", ) + realStressBulkChunkSize = flag.Int( //nolint:gochecknoglobals // package-level state intentional + "olcrtc.stress-bulk-chunk", + 4096, + "bulk request-response chunk size in bytes", + ) ) // TestRealProviderTransportStress exercises every real carrier×transport @@ -152,7 +157,7 @@ func runRealE2EStressCase(t *testing.T, carrierName, transportName, roomURL, ech defer func() { _ = conn.Close() }() if d := *realStressBulkDuration; d > 0 { - written, dur, err := streamPatternForDuration(conn, d) + written, dur, err := streamPatternForDuration(conn, d, *realStressBulkChunkSize) if err != nil { return fmt.Errorf("bulk pump: %w", err) } @@ -190,96 +195,59 @@ func runRealE2EStressCase(t *testing.T, carrierName, transportName, roomURL, ech } // streamPatternForDuration pumps a deterministic byte pattern through conn -// for at most `duration`, reading the echoed bytes back and verifying they -// match. Returns total bytes successfully echoed and the elapsed time. +// for at most `duration` using a synchronous request-response loop: write a +// chunk, wait until the same chunk echoes back and verify, then write the +// next one. Returns total bytes successfully echoed and elapsed time. // -// Unlike streamPatternAndVerifyEcho (which fixes the size budget upfront), -// this variant respects a wall-clock deadline. That matters because -// transport throughputs differ by orders of magnitude — a fixed byte budget -// either takes seconds (datachannel) or runs past any sane CI timeout -// (videochannel). We stop the writer at the deadline and wait for the -// reader to drain in-flight bytes within a short grace window. -// -//nolint:cyclop,gocognit // two cooperating loops + deadlines naturally branch -func streamPatternForDuration(conn net.Conn, duration time.Duration) (int64, time.Duration, error) { - const chunkSize = 4096 - const drainGrace = 3 * time.Second +// Why request-response rather than concurrent write+read streams: +// transport throughputs differ by ~3 orders of magnitude (datachannel does +// MiB/s; videochannel/seichannel ~25 KB/s through 256-byte qr-encoded +// frames at 25 FPS). An asynchronous writer outruns a slow transport, +// fills muxconn / SOCKS / RTP-track buffers, and the deadlocked pipe +// eventually trips a TCP-write deadline — which is not a real bug, just +// the natural consequence of pumping into a slow pipe with no flow +// control. Request-response naturally rate-limits to the transport's +// actual round-trip throughput, which is what we want to measure. +func streamPatternForDuration(conn net.Conn, duration time.Duration, chunkSize int) (int64, time.Duration, error) { + if chunkSize <= 0 { + chunkSize = 4096 + } + // Per-chunk roundtrip deadline. Slow transports (videochannel) can + // take seconds+ per chunk in practice; 15s gives ample margin + // without making genuine stalls hang forever. + const chunkTimeout = 15 * time.Second start := time.Now() - writeDeadline := start.Add(duration) - - writeDone := make(chan struct{}) - writeErr := make(chan error, 1) - var writtenTotal int64 - - go func() { - defer close(writeDone) - buf := make([]byte, chunkSize) - var written int64 - for time.Now().Before(writeDeadline) { - fillPattern(buf, written) - if err := conn.SetWriteDeadline(time.Now().Add(5 * time.Second)); err != nil { - writeErr <- fmt.Errorf("set write deadline at %d: %w", written, err) - return - } - n, err := conn.Write(buf) - written += int64(n) - if err != nil { - writeErr <- fmt.Errorf("write at %d: %w", written, err) - return - } - } - writtenTotal = written - writeErr <- nil - }() + deadline := start.Add(duration) buf := make([]byte, chunkSize) + echoed := make([]byte, chunkSize) want := make([]byte, chunkSize) - var read int64 - for { - // Reader stops once it has consumed everything the writer produced - // (within drainGrace after writer finishes). - select { - case <-writeDone: - if read >= writtenTotal && writtenTotal > 0 { - if err := <-writeErr; err != nil { - return read, time.Since(start), err - } - return read, time.Since(start), nil - } - default: - } - readDeadline := time.Now().Add(5 * time.Second) - if !time.Now().Before(writeDeadline) { - // Writer phase done; allow a short grace window for the tunnel - // to drain bytes already in flight. - readDeadline = time.Now().Add(drainGrace) + reader := bufio.NewReader(conn) + var total int64 + + for time.Now().Before(deadline) { + fillPattern(buf, total) + if err := conn.SetWriteDeadline(time.Now().Add(chunkTimeout)); err != nil { + return total, time.Since(start), fmt.Errorf("set write deadline at %d: %w", total, err) } - if err := conn.SetReadDeadline(readDeadline); err != nil { - return read, time.Since(start), fmt.Errorf("set read deadline: %w", err) + if _, err := conn.Write(buf); err != nil { + return total, time.Since(start), fmt.Errorf("write at %d: %w", total, err) } - n, err := io.ReadFull(conn, buf) - if err != nil { - // Reader timed out after writer is done — that's clean drain end. - select { - case <-writeDone: - if read > 0 { - if werr := <-writeErr; werr != nil { - return read, time.Since(start), werr - } - return read, time.Since(start), nil - } - default: - } - return read, time.Since(start), fmt.Errorf("read at %d: %w", read, err) + if err := conn.SetReadDeadline(time.Now().Add(chunkTimeout)); err != nil { + return total, time.Since(start), fmt.Errorf("set read deadline at %d: %w", total, err) } - fillPattern(want[:n], read) - if !bytes.Equal(buf[:n], want[:n]) { - return read, time.Since(start), fmt.Errorf("%w %d", errPayloadMismatchOffset, read) + if _, err := io.ReadFull(reader, echoed); err != nil { + return total, time.Since(start), fmt.Errorf("read at %d: %w", total, err) } - read += int64(n) + fillPattern(want, total) + if !bytes.Equal(echoed, want) { + return total, time.Since(start), fmt.Errorf("%w %d", errPayloadMismatchOffset, total) + } + total += int64(chunkSize) } + return total, time.Since(start), nil } type echoStats struct {