From 10c7c655b6af2fe46b732462b65facb4e49c9661 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Tue, 26 May 2026 02:50:29 +0300 Subject: [PATCH] fix(vp8channel): reset peer state on KCP restart --- internal/e2e/real_soak_test.go | 5 +++++ internal/transport/vp8channel/transport.go | 4 ++++ .../transport/vp8channel/transport_unit_test.go | 17 ++++++++++------- 3 files changed, 19 insertions(+), 7 deletions(-) diff --git a/internal/e2e/real_soak_test.go b/internal/e2e/real_soak_test.go index c3e0367..590660d 100644 --- a/internal/e2e/real_soak_test.go +++ b/internal/e2e/real_soak_test.go @@ -105,11 +105,16 @@ func runRealSoakOnce(t *testing.T, carrierName, transportName, roomURL, echoAddr carrierName, transportName, *realSoakDuration, *realSoakChunk, *realSoakVerify, *realSoakProgress) + expectation := realE2ECaseExpectation(carrierName, transportName) + ctx, cancel := context.WithTimeout(context.Background(), *realSoakDuration+setupBudget) defer cancel() rt, err := startRealTunnel(ctx, t, carrierName, transportName, roomURL, testClientDeviceID, testClientDeviceID) if err != nil { + if expectation == realE2EExpectUnstable || expectation == realE2EExpectFail { + t.Skipf("start tunnel failed (expected %s): %v", realE2EExpectationLabel(expectation), err) + } t.Fatalf("start tunnel: %v", err) } _ = rt diff --git a/internal/transport/vp8channel/transport.go b/internal/transport/vp8channel/transport.go index 6d16f41..27a6f41 100644 --- a/internal/transport/vp8channel/transport.go +++ b/internal/transport/vp8channel/transport.go @@ -418,6 +418,8 @@ func (p *streamTransport) drainOutbound() { // this before rebuilding smux so replacement handshakes are not parsed behind // stale bytes from streams that were active when the old session died. func (p *streamTransport) ResetPeer() { + p.peerConfirmed.Store(false) + p.peerEpoch.Store(0) p.restartKCP(p.rotateEpochHeader()) } @@ -549,6 +551,8 @@ func appendBatchPacket(dst, packet []byte) []byte { } func (p *streamTransport) resetKCP() { + p.peerConfirmed.Store(false) + p.peerEpoch.Store(0) p.restartKCP(p.epochHeader()) } diff --git a/internal/transport/vp8channel/transport_unit_test.go b/internal/transport/vp8channel/transport_unit_test.go index 014b295..16cb0b8 100644 --- a/internal/transport/vp8channel/transport_unit_test.go +++ b/internal/transport/vp8channel/transport_unit_test.go @@ -378,13 +378,16 @@ func TestHandleIncomingFrameEpochFilteringAndReconnect(t *testing.T) { t.Fatalf("stream reconnect did not reset/callback: reconnected=%v kcp=%v", reconnected, tr.kcp) } reconnected = false - // Peer is already confirmed from the first frame above. - // In single-peer mode, frames from a different epoch are ignored. - tr.handleIncomingFrame(mkFrame(tr.bindingToken, 2, []byte("other-participant"))) - if reconnected { - t.Fatal("epoch change from another participant should not trigger reconnect") + // After reconnect, peerConfirmed is reset so the next frame re-latches + // the peer epoch. This allows the server to restart with a new epoch. + if tr.peerConfirmed.Load() { + t.Fatal("reconnect should reset peerConfirmed") } - if tr.peerEpoch.Load() != 1 { - t.Fatalf("peer epoch changed unexpectedly: got %d want 1", tr.peerEpoch.Load()) + tr.handleIncomingFrame(mkFrame(tr.bindingToken, 2, []byte("new-peer-after-reconnect"))) + if !tr.peerConfirmed.Load() { + t.Fatal("frame after reconnect should re-latch peer") + } + if tr.peerEpoch.Load() != 2 { + t.Fatalf("peer epoch not re-latched: got %d want 2", tr.peerEpoch.Load()) } }