From fb5ce90e612b4fca2ec3a2f3c9cdd6f35e586200 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Fri, 22 May 2026 20:51:46 +0300 Subject: [PATCH] refactor(e2e): extract soak pump helpers and modernize range lnt --- internal/crypto/chacha_test.go | 6 +- internal/e2e/local_soak_test.go | 178 +++++++++++++++++++------------- 2 files changed, 107 insertions(+), 77 deletions(-) diff --git a/internal/crypto/chacha_test.go b/internal/crypto/chacha_test.go index 1b8e079..84c151a 100644 --- a/internal/crypto/chacha_test.go +++ b/internal/crypto/chacha_test.go @@ -64,7 +64,7 @@ func TestEncryptUniqueNonces(t *testing.T) { const iterations = 1024 nonceSize := chacha20poly1305.NonceSizeX seen := make(map[string]struct{}, iterations) - for i := 0; i < iterations; i++ { + for i := range iterations { ct, err := c.Encrypt([]byte("payload")) if err != nil { t.Fatalf("Encrypt() error = %v", err) @@ -140,7 +140,7 @@ func BenchmarkEncrypt(b *testing.B) { payload := bytes.Repeat([]byte{0xab}, 12*1024) b.ResetTimer() b.SetBytes(int64(len(payload))) - for i := 0; i < b.N; i++ { + for range b.N { if _, err := c.Encrypt(payload); err != nil { b.Fatalf("Encrypt() error = %v", err) } @@ -159,7 +159,7 @@ func BenchmarkDecrypt(b *testing.B) { } b.ResetTimer() b.SetBytes(int64(len(payload))) - for i := 0; i < b.N; i++ { + for range b.N { if _, err := c.Decrypt(ct); err != nil { b.Fatalf("Decrypt() error = %v", err) } diff --git a/internal/e2e/local_soak_test.go b/internal/e2e/local_soak_test.go index bfbba18..2f9fbcb 100644 --- a/internal/e2e/local_soak_test.go +++ b/internal/e2e/local_soak_test.go @@ -110,7 +110,7 @@ func TestLocalThroughputSoak(t *testing.T) { pumpCtx, cancelPump := context.WithTimeout(context.Background(), *localSoakDuration) defer cancelPump() - stats := runLocalSoakPump(t, pumpCtx, conn, *localSoakChunk, *localSoakVerify, *localSoakProgress) + stats := runLocalSoakPump(pumpCtx, t, conn, *localSoakChunk, *localSoakVerify, *localSoakProgress) if stats.sent == 0 || stats.recv == 0 { t.Fatalf("no traffic moved: sent=%d recv=%d", stats.sent, stats.recv) @@ -189,8 +189,8 @@ type localSoakStats struct { // same conn until ctx expires, periodically logging progress. Bytes are // counted atomically so the progress logger sees a coherent snapshot. func runLocalSoakPump( - t *testing.T, ctx context.Context, + t *testing.T, conn net.Conn, chunkSize int, verify bool, @@ -202,34 +202,7 @@ func runLocalSoakPump( start := time.Now() progressDone := make(chan struct{}) - go func() { - defer close(progressDone) - if progressEvery <= 0 { - return - } - ticker := time.NewTicker(progressEvery) - defer ticker.Stop() - var lastSent, lastRecv int64 - lastTime := start - for { - select { - case <-ctx.Done(): - return - case now := <-ticker.C: - s, r := sent.Load(), recv.Load() - dt := now.Sub(lastTime).Seconds() - instSendRate := int64(float64(s-lastSent) / dt) - instRecvRate := int64(float64(r-lastRecv) / dt) - t.Logf("[soak] elapsed=%s sent=%s recv=%s tx=%s/s rx=%s/s", - now.Sub(start).Round(time.Second), - humanBytes(s), humanBytes(r), - humanBytes(instSendRate), humanBytes(instRecvRate), - ) - lastSent, lastRecv = s, r - lastTime = now - } - } - }() + go runLocalSoakProgress(ctx, t, &sent, &recv, start, progressEvery, progressDone) var ( wg sync.WaitGroup @@ -244,50 +217,8 @@ func runLocalSoakPump( } wg.Add(2) - - // Writer: deterministic pattern by absolute byte offset. - go func() { - defer wg.Done() - buf := make([]byte, chunkSize) - var off int64 - for ctx.Err() == nil { - fillPattern(buf, off) - if _, err := conn.Write(buf); err != nil { - if ctx.Err() == nil { - recordErr(fmt.Errorf("write at %d: %w", off, err)) - } - return - } - off += int64(chunkSize) - sent.Add(int64(chunkSize)) - } - }() - - // Reader: io.ReadFull echoed bytes, optionally verify against pattern. - go func() { - defer wg.Done() - rdr := bufio.NewReader(conn) - echoed := make([]byte, chunkSize) - want := make([]byte, chunkSize) - var off int64 - for ctx.Err() == nil { - if _, err := io.ReadFull(rdr, echoed); err != nil { - if ctx.Err() == nil { - recordErr(fmt.Errorf("read at %d: %w", off, err)) - } - return - } - if verify { - fillPattern(want, off) - if !bytes.Equal(echoed, want) { - recordErr(fmt.Errorf("%w at offset %d", errLocalSoakPayloadMismatch, off)) - return - } - } - off += int64(chunkSize) - recv.Add(int64(chunkSize)) - } - }() + go pumpWriter(ctx, conn, chunkSize, &sent, &wg, recordErr) + go pumpReader(ctx, conn, chunkSize, verify, &recv, &wg, recordErr) <-ctx.Done() // Force-close the conn so both pumps unblock from any in-flight I/O. @@ -304,6 +235,105 @@ func runLocalSoakPump( } } +// runLocalSoakProgress emits periodic throughput lines until ctx fires. +func runLocalSoakProgress( + ctx context.Context, + t *testing.T, + sent, recv *atomic.Int64, + start time.Time, + progressEvery time.Duration, + done chan<- struct{}, +) { + t.Helper() + defer close(done) + if progressEvery <= 0 { + return + } + ticker := time.NewTicker(progressEvery) + defer ticker.Stop() + var lastSent, lastRecv int64 + lastTime := start + for { + select { + case <-ctx.Done(): + return + case now := <-ticker.C: + s, r := sent.Load(), recv.Load() + dt := now.Sub(lastTime).Seconds() + instSendRate := int64(float64(s-lastSent) / dt) + instRecvRate := int64(float64(r-lastRecv) / dt) + t.Logf("[soak] elapsed=%s sent=%s recv=%s tx=%s/s rx=%s/s", + now.Sub(start).Round(time.Second), + humanBytes(s), humanBytes(r), + humanBytes(instSendRate), humanBytes(instRecvRate), + ) + lastSent, lastRecv = s, r + lastTime = now + } + } +} + +// pumpWriter pushes a deterministic byte pattern through conn until ctx +// expires or the connection errors out. +func pumpWriter( + ctx context.Context, + conn net.Conn, + chunkSize int, + sent *atomic.Int64, + wg *sync.WaitGroup, + recordErr func(error), +) { + defer wg.Done() + buf := make([]byte, chunkSize) + var off int64 + for ctx.Err() == nil { + fillPattern(buf, off) + if _, err := conn.Write(buf); err != nil { + if ctx.Err() == nil { + recordErr(fmt.Errorf("write at %d: %w", off, err)) + } + return + } + off += int64(chunkSize) + sent.Add(int64(chunkSize)) + } +} + +// pumpReader reads echoed bytes back, optionally verifying them against +// the deterministic pattern that pumpWriter produced at the same offset. +func pumpReader( + ctx context.Context, + conn net.Conn, + chunkSize int, + verify bool, + recv *atomic.Int64, + wg *sync.WaitGroup, + recordErr func(error), +) { + defer wg.Done() + rdr := bufio.NewReader(conn) + echoed := make([]byte, chunkSize) + want := make([]byte, chunkSize) + var off int64 + for ctx.Err() == nil { + if _, err := io.ReadFull(rdr, echoed); err != nil { + if ctx.Err() == nil { + recordErr(fmt.Errorf("read at %d: %w", off, err)) + } + return + } + if verify { + fillPattern(want, off) + if !bytes.Equal(echoed, want) { + recordErr(fmt.Errorf("%w at offset %d", errLocalSoakPayloadMismatch, off)) + return + } + } + off += int64(chunkSize) + recv.Add(int64(chunkSize)) + } +} + // isExpectedShutdownErr filters errors that just mean "we asked the conn // to stop" — deadline expirations from our SetDeadline kick, EOF from the // peer half-closing, etc.