fix(vp8channel): latch peer epoch on first frame received

This commit is contained in:
zarazaex69
2026-05-24 18:05:43 +03:00
parent 6d529c16a8
commit 83a94948ae
2 changed files with 14 additions and 22 deletions

View File

@@ -177,6 +177,7 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error)
tr := &streamTransport{
stream: stream,
track: track,
onData: cfg.OnData,
onPeerData: cfg.OnPeerData,
outbound: make(chan []byte, outboundQueueSize),
closeCh: make(chan struct{}),
@@ -658,13 +659,12 @@ func (p *streamTransport) readVP8Track(track *webrtc.TrackRemote) {
}
func (p *streamTransport) handleFirstPeer(peerEpoch uint32) {
logger.Infof("vp8channel: peer candidate epoch=0x%08x", peerEpoch)
p.peerEpoch.Store(peerEpoch)
p.peerConfirmed.Store(true)
logger.Infof("vp8channel: peer latched epoch=0x%08x", peerEpoch)
}
// handleIncomingFrame parses the epoch header and delivers KCP payload.
// In single-peer mode (client), frames from all epochs are delivered to
// KCP until the peer is confirmed (peerConfirmed=true). Once confirmed,
// only frames from the confirmed epoch are accepted.
func (p *streamTransport) handleIncomingFrame(frame []byte) {
frameToken, peerEpoch, ok := parseEpochHeader(frame)
if !ok {
@@ -684,16 +684,11 @@ func (p *streamTransport) handleIncomingFrame(frame []byte) {
return
}
// Single-peer mode: before peer is confirmed, accept frames from any
// epoch (the server's epoch is unknown until handshake completes).
// After confirmation, reject frames from other epochs.
if p.peerConfirmed.Load() {
if peerEpoch != p.peerEpoch.Load() {
return
}
} else {
// Track the latest candidate epoch for logging.
p.peerEpoch.Store(peerEpoch)
// Single-peer mode: latch on first epoch seen, ignore all others.
if !p.peerConfirmed.Load() {
p.handleFirstPeer(peerEpoch)
} else if peerEpoch != p.peerEpoch.Load() {
return
}
if len(kcpPayload) == 0 {

View File

@@ -355,10 +355,10 @@ func TestHandleIncomingFrameEpochFilteringAndReconnect(t *testing.T) {
t.Fatal("filtered frames changed peer state")
}
// Keepalive (nil payload) stores candidate epoch but does not confirm.
// Keepalive (nil payload) latches peer immediately.
tr.handleIncomingFrame(mkFrame(tr.bindingToken, 1, nil))
if tr.peerConfirmed.Load() {
t.Fatal("keepalive should not confirm peer")
if !tr.peerConfirmed.Load() {
t.Fatal("first frame should confirm peer")
}
if tr.peerEpoch.Load() != 1 {
t.Fatalf("peer epoch not stored: got %d want 1", tr.peerEpoch.Load())
@@ -378,11 +378,8 @@ func TestHandleIncomingFrameEpochFilteringAndReconnect(t *testing.T) {
t.Fatalf("stream reconnect did not reset/callback: reconnected=%v kcp=%v", reconnected, tr.kcp)
}
reconnected = false
// Confirm the peer so subsequent frames from other epochs are rejected.
tr.peerConfirmed.Store(true)
tr.peerEpoch.Store(1)
// In single-peer mode, frames from a different epoch are ignored (other
// participants in the room). The client does NOT reconnect.
// Peer is already confirmed from the first frame above.
// In single-peer mode, frames from a different epoch are ignored.
tr.handleIncomingFrame(mkFrame(tr.bindingToken, 2, []byte("other-participant")))
if reconnected {
t.Fatal("epoch change from another participant should not trigger reconnect")