mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-05-26 23:19:47 +00:00
refactor(e2e): extract soak pump helpers and modernize range lnt
This commit is contained in:
@@ -64,7 +64,7 @@ func TestEncryptUniqueNonces(t *testing.T) {
|
||||
const iterations = 1024
|
||||
nonceSize := chacha20poly1305.NonceSizeX
|
||||
seen := make(map[string]struct{}, iterations)
|
||||
for i := 0; i < iterations; i++ {
|
||||
for i := range iterations {
|
||||
ct, err := c.Encrypt([]byte("payload"))
|
||||
if err != nil {
|
||||
t.Fatalf("Encrypt() error = %v", err)
|
||||
@@ -140,7 +140,7 @@ func BenchmarkEncrypt(b *testing.B) {
|
||||
payload := bytes.Repeat([]byte{0xab}, 12*1024)
|
||||
b.ResetTimer()
|
||||
b.SetBytes(int64(len(payload)))
|
||||
for i := 0; i < b.N; i++ {
|
||||
for range b.N {
|
||||
if _, err := c.Encrypt(payload); err != nil {
|
||||
b.Fatalf("Encrypt() error = %v", err)
|
||||
}
|
||||
@@ -159,7 +159,7 @@ func BenchmarkDecrypt(b *testing.B) {
|
||||
}
|
||||
b.ResetTimer()
|
||||
b.SetBytes(int64(len(payload)))
|
||||
for i := 0; i < b.N; i++ {
|
||||
for range b.N {
|
||||
if _, err := c.Decrypt(ct); err != nil {
|
||||
b.Fatalf("Decrypt() error = %v", err)
|
||||
}
|
||||
|
||||
@@ -110,7 +110,7 @@ func TestLocalThroughputSoak(t *testing.T) {
|
||||
pumpCtx, cancelPump := context.WithTimeout(context.Background(), *localSoakDuration)
|
||||
defer cancelPump()
|
||||
|
||||
stats := runLocalSoakPump(t, pumpCtx, conn, *localSoakChunk, *localSoakVerify, *localSoakProgress)
|
||||
stats := runLocalSoakPump(pumpCtx, t, conn, *localSoakChunk, *localSoakVerify, *localSoakProgress)
|
||||
|
||||
if stats.sent == 0 || stats.recv == 0 {
|
||||
t.Fatalf("no traffic moved: sent=%d recv=%d", stats.sent, stats.recv)
|
||||
@@ -189,8 +189,8 @@ type localSoakStats struct {
|
||||
// same conn until ctx expires, periodically logging progress. Bytes are
|
||||
// counted atomically so the progress logger sees a coherent snapshot.
|
||||
func runLocalSoakPump(
|
||||
t *testing.T,
|
||||
ctx context.Context,
|
||||
t *testing.T,
|
||||
conn net.Conn,
|
||||
chunkSize int,
|
||||
verify bool,
|
||||
@@ -202,34 +202,7 @@ func runLocalSoakPump(
|
||||
start := time.Now()
|
||||
|
||||
progressDone := make(chan struct{})
|
||||
go func() {
|
||||
defer close(progressDone)
|
||||
if progressEvery <= 0 {
|
||||
return
|
||||
}
|
||||
ticker := time.NewTicker(progressEvery)
|
||||
defer ticker.Stop()
|
||||
var lastSent, lastRecv int64
|
||||
lastTime := start
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case now := <-ticker.C:
|
||||
s, r := sent.Load(), recv.Load()
|
||||
dt := now.Sub(lastTime).Seconds()
|
||||
instSendRate := int64(float64(s-lastSent) / dt)
|
||||
instRecvRate := int64(float64(r-lastRecv) / dt)
|
||||
t.Logf("[soak] elapsed=%s sent=%s recv=%s tx=%s/s rx=%s/s",
|
||||
now.Sub(start).Round(time.Second),
|
||||
humanBytes(s), humanBytes(r),
|
||||
humanBytes(instSendRate), humanBytes(instRecvRate),
|
||||
)
|
||||
lastSent, lastRecv = s, r
|
||||
lastTime = now
|
||||
}
|
||||
}
|
||||
}()
|
||||
go runLocalSoakProgress(ctx, t, &sent, &recv, start, progressEvery, progressDone)
|
||||
|
||||
var (
|
||||
wg sync.WaitGroup
|
||||
@@ -244,50 +217,8 @@ func runLocalSoakPump(
|
||||
}
|
||||
|
||||
wg.Add(2)
|
||||
|
||||
// Writer: deterministic pattern by absolute byte offset.
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
buf := make([]byte, chunkSize)
|
||||
var off int64
|
||||
for ctx.Err() == nil {
|
||||
fillPattern(buf, off)
|
||||
if _, err := conn.Write(buf); err != nil {
|
||||
if ctx.Err() == nil {
|
||||
recordErr(fmt.Errorf("write at %d: %w", off, err))
|
||||
}
|
||||
return
|
||||
}
|
||||
off += int64(chunkSize)
|
||||
sent.Add(int64(chunkSize))
|
||||
}
|
||||
}()
|
||||
|
||||
// Reader: io.ReadFull echoed bytes, optionally verify against pattern.
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
rdr := bufio.NewReader(conn)
|
||||
echoed := make([]byte, chunkSize)
|
||||
want := make([]byte, chunkSize)
|
||||
var off int64
|
||||
for ctx.Err() == nil {
|
||||
if _, err := io.ReadFull(rdr, echoed); err != nil {
|
||||
if ctx.Err() == nil {
|
||||
recordErr(fmt.Errorf("read at %d: %w", off, err))
|
||||
}
|
||||
return
|
||||
}
|
||||
if verify {
|
||||
fillPattern(want, off)
|
||||
if !bytes.Equal(echoed, want) {
|
||||
recordErr(fmt.Errorf("%w at offset %d", errLocalSoakPayloadMismatch, off))
|
||||
return
|
||||
}
|
||||
}
|
||||
off += int64(chunkSize)
|
||||
recv.Add(int64(chunkSize))
|
||||
}
|
||||
}()
|
||||
go pumpWriter(ctx, conn, chunkSize, &sent, &wg, recordErr)
|
||||
go pumpReader(ctx, conn, chunkSize, verify, &recv, &wg, recordErr)
|
||||
|
||||
<-ctx.Done()
|
||||
// Force-close the conn so both pumps unblock from any in-flight I/O.
|
||||
@@ -304,6 +235,105 @@ func runLocalSoakPump(
|
||||
}
|
||||
}
|
||||
|
||||
// runLocalSoakProgress emits periodic throughput lines until ctx fires.
|
||||
func runLocalSoakProgress(
|
||||
ctx context.Context,
|
||||
t *testing.T,
|
||||
sent, recv *atomic.Int64,
|
||||
start time.Time,
|
||||
progressEvery time.Duration,
|
||||
done chan<- struct{},
|
||||
) {
|
||||
t.Helper()
|
||||
defer close(done)
|
||||
if progressEvery <= 0 {
|
||||
return
|
||||
}
|
||||
ticker := time.NewTicker(progressEvery)
|
||||
defer ticker.Stop()
|
||||
var lastSent, lastRecv int64
|
||||
lastTime := start
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case now := <-ticker.C:
|
||||
s, r := sent.Load(), recv.Load()
|
||||
dt := now.Sub(lastTime).Seconds()
|
||||
instSendRate := int64(float64(s-lastSent) / dt)
|
||||
instRecvRate := int64(float64(r-lastRecv) / dt)
|
||||
t.Logf("[soak] elapsed=%s sent=%s recv=%s tx=%s/s rx=%s/s",
|
||||
now.Sub(start).Round(time.Second),
|
||||
humanBytes(s), humanBytes(r),
|
||||
humanBytes(instSendRate), humanBytes(instRecvRate),
|
||||
)
|
||||
lastSent, lastRecv = s, r
|
||||
lastTime = now
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// pumpWriter pushes a deterministic byte pattern through conn until ctx
|
||||
// expires or the connection errors out.
|
||||
func pumpWriter(
|
||||
ctx context.Context,
|
||||
conn net.Conn,
|
||||
chunkSize int,
|
||||
sent *atomic.Int64,
|
||||
wg *sync.WaitGroup,
|
||||
recordErr func(error),
|
||||
) {
|
||||
defer wg.Done()
|
||||
buf := make([]byte, chunkSize)
|
||||
var off int64
|
||||
for ctx.Err() == nil {
|
||||
fillPattern(buf, off)
|
||||
if _, err := conn.Write(buf); err != nil {
|
||||
if ctx.Err() == nil {
|
||||
recordErr(fmt.Errorf("write at %d: %w", off, err))
|
||||
}
|
||||
return
|
||||
}
|
||||
off += int64(chunkSize)
|
||||
sent.Add(int64(chunkSize))
|
||||
}
|
||||
}
|
||||
|
||||
// pumpReader reads echoed bytes back, optionally verifying them against
|
||||
// the deterministic pattern that pumpWriter produced at the same offset.
|
||||
func pumpReader(
|
||||
ctx context.Context,
|
||||
conn net.Conn,
|
||||
chunkSize int,
|
||||
verify bool,
|
||||
recv *atomic.Int64,
|
||||
wg *sync.WaitGroup,
|
||||
recordErr func(error),
|
||||
) {
|
||||
defer wg.Done()
|
||||
rdr := bufio.NewReader(conn)
|
||||
echoed := make([]byte, chunkSize)
|
||||
want := make([]byte, chunkSize)
|
||||
var off int64
|
||||
for ctx.Err() == nil {
|
||||
if _, err := io.ReadFull(rdr, echoed); err != nil {
|
||||
if ctx.Err() == nil {
|
||||
recordErr(fmt.Errorf("read at %d: %w", off, err))
|
||||
}
|
||||
return
|
||||
}
|
||||
if verify {
|
||||
fillPattern(want, off)
|
||||
if !bytes.Equal(echoed, want) {
|
||||
recordErr(fmt.Errorf("%w at offset %d", errLocalSoakPayloadMismatch, off))
|
||||
return
|
||||
}
|
||||
}
|
||||
off += int64(chunkSize)
|
||||
recv.Add(int64(chunkSize))
|
||||
}
|
||||
}
|
||||
|
||||
// isExpectedShutdownErr filters errors that just mean "we asked the conn
|
||||
// to stop" — deadline expirations from our SetDeadline kick, EOF from the
|
||||
// peer half-closing, etc.
|
||||
|
||||
Reference in New Issue
Block a user