From 83a94948aeb2586425aef5bdb35e202be388f564 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Sun, 24 May 2026 18:05:43 +0300 Subject: [PATCH] fix(vp8channel): latch peer epoch on first frame received --- internal/transport/vp8channel/transport.go | 23 ++++++++----------- .../vp8channel/transport_unit_test.go | 13 ++++------- 2 files changed, 14 insertions(+), 22 deletions(-) diff --git a/internal/transport/vp8channel/transport.go b/internal/transport/vp8channel/transport.go index 124e934..c20afe8 100644 --- a/internal/transport/vp8channel/transport.go +++ b/internal/transport/vp8channel/transport.go @@ -177,6 +177,7 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) tr := &streamTransport{ stream: stream, track: track, + onData: cfg.OnData, onPeerData: cfg.OnPeerData, outbound: make(chan []byte, outboundQueueSize), closeCh: make(chan struct{}), @@ -658,13 +659,12 @@ func (p *streamTransport) readVP8Track(track *webrtc.TrackRemote) { } func (p *streamTransport) handleFirstPeer(peerEpoch uint32) { - logger.Infof("vp8channel: peer candidate epoch=0x%08x", peerEpoch) + p.peerEpoch.Store(peerEpoch) + p.peerConfirmed.Store(true) + logger.Infof("vp8channel: peer latched epoch=0x%08x", peerEpoch) } // handleIncomingFrame parses the epoch header and delivers KCP payload. -// In single-peer mode (client), frames from all epochs are delivered to -// KCP until the peer is confirmed (peerConfirmed=true). Once confirmed, -// only frames from the confirmed epoch are accepted. func (p *streamTransport) handleIncomingFrame(frame []byte) { frameToken, peerEpoch, ok := parseEpochHeader(frame) if !ok { @@ -684,16 +684,11 @@ func (p *streamTransport) handleIncomingFrame(frame []byte) { return } - // Single-peer mode: before peer is confirmed, accept frames from any - // epoch (the server's epoch is unknown until handshake completes). - // After confirmation, reject frames from other epochs. - if p.peerConfirmed.Load() { - if peerEpoch != p.peerEpoch.Load() { - return - } - } else { - // Track the latest candidate epoch for logging. - p.peerEpoch.Store(peerEpoch) + // Single-peer mode: latch on first epoch seen, ignore all others. + if !p.peerConfirmed.Load() { + p.handleFirstPeer(peerEpoch) + } else if peerEpoch != p.peerEpoch.Load() { + return } if len(kcpPayload) == 0 { diff --git a/internal/transport/vp8channel/transport_unit_test.go b/internal/transport/vp8channel/transport_unit_test.go index 0cc2aeb..014b295 100644 --- a/internal/transport/vp8channel/transport_unit_test.go +++ b/internal/transport/vp8channel/transport_unit_test.go @@ -355,10 +355,10 @@ func TestHandleIncomingFrameEpochFilteringAndReconnect(t *testing.T) { t.Fatal("filtered frames changed peer state") } - // Keepalive (nil payload) stores candidate epoch but does not confirm. + // Keepalive (nil payload) latches peer immediately. tr.handleIncomingFrame(mkFrame(tr.bindingToken, 1, nil)) - if tr.peerConfirmed.Load() { - t.Fatal("keepalive should not confirm peer") + if !tr.peerConfirmed.Load() { + t.Fatal("first frame should confirm peer") } if tr.peerEpoch.Load() != 1 { t.Fatalf("peer epoch not stored: got %d want 1", tr.peerEpoch.Load()) @@ -378,11 +378,8 @@ func TestHandleIncomingFrameEpochFilteringAndReconnect(t *testing.T) { t.Fatalf("stream reconnect did not reset/callback: reconnected=%v kcp=%v", reconnected, tr.kcp) } reconnected = false - // Confirm the peer so subsequent frames from other epochs are rejected. - tr.peerConfirmed.Store(true) - tr.peerEpoch.Store(1) - // In single-peer mode, frames from a different epoch are ignored (other - // participants in the room). The client does NOT reconnect. + // Peer is already confirmed from the first frame above. + // In single-peer mode, frames from a different epoch are ignored. tr.handleIncomingFrame(mkFrame(tr.bindingToken, 2, []byte("other-participant"))) if reconnected { t.Fatal("epoch change from another participant should not trigger reconnect")