From a9512d2488c25c547852a780dffefc2150ac5731 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Fri, 15 May 2026 23:09:24 +0300 Subject: [PATCH] Revert "fix(transport): pin peer channel after validation" This reverts commit 7f9351dad68336951f9fb6a889a412e73ddb789f. --- internal/transport/seichannel/transport.go | 41 ++++++------------- .../seichannel/transport_unit_test.go | 2 +- internal/transport/videochannel/transport.go | 41 ++++++------------- .../videochannel/transport_unit_test.go | 2 +- 4 files changed, 28 insertions(+), 58 deletions(-) diff --git a/internal/transport/seichannel/transport.go b/internal/transport/seichannel/transport.go index 61f5fd0..50c1e8a 100644 --- a/internal/transport/seichannel/transport.go +++ b/internal/transport/seichannel/transport.go @@ -446,34 +446,31 @@ func (p *streamTransport) handleSample(sample []byte) { continue } - // Multi-party MUCs (e.g. Jitsi) can deliver frames from other peers, - // or RTP echo from previously-closed sessions, to our PeerConnection. - // Once we've identified the real partner's channelID, drop everything - // else. We can't pin the partner from a raw frame header alone — a - // stray RTP packet might decode to a valid magic/version by chance — - // so the pin happens downstream, only after a CRC-validated payload - // (DATA) or a matching ACK waiter has confirmed the sender is ours. - if pinned := p.peerChannelID.Load(); pinned != 0 && frame.channelID != pinned { + // Multi-party MUCs (e.g. Jitsi) can deliver frames from other + // peers — or RTP echo from previously-closed sessions — to our + // PeerConnection. The first valid frame we see fixes the peer's + // channelID; later frames with a different ID are silently dropped. + if !p.acceptChannel(frame.channelID) { continue } switch frame.typ { case frameTypeAck: - p.resolveAck(frame.channelID, frame.seq, frame.crc) + p.resolveAck(frame.seq, frame.crc) case frameTypeData: p.handleInboundFrame(frame) } } } -// pinPeerChannel commits the partner's channelID after a frame from them has -// been validated downstream. It's a one-shot CAS — later validated frames -// keep the same partner. id==0 is never accepted. -func (p *streamTransport) pinPeerChannel(id uint32) { +func (p *streamTransport) acceptChannel(id uint32) bool { if id == 0 { - return + return false } - p.peerChannelID.CompareAndSwap(0, id) + if p.peerChannelID.CompareAndSwap(0, id) { + return true + } + return p.peerChannelID.Load() == id } func (p *streamTransport) upsertInbound(frame transportFrame) (*inboundMessage, bool) { @@ -514,9 +511,6 @@ func (p *streamTransport) handleInboundFrame(frame transportFrame) { p.recvMu.Lock() if crc, ok := p.delivered[frame.seq]; ok && crc == frame.crc { p.recvMu.Unlock() - // Already-delivered duplicate: the peer is genuine (we accepted - // this seq earlier and CRC-matched it), so pin and re-ack. - p.pinPeerChannel(frame.channelID) p.sendAck(frame.seq, frame.crc) return } @@ -541,11 +535,6 @@ func (p *streamTransport) handleInboundFrame(frame transportFrame) { p.delivered[frame.seq] = msg.crc p.recvMu.Unlock() - // CRC validated end-to-end — this is our real partner. Pin their - // channelID so future stray frames from other MUC participants are - // dropped before reaching the reassembler. - p.pinPeerChannel(frame.channelID) - if p.onData != nil { p.onData(data) } @@ -556,7 +545,7 @@ func (p *streamTransport) sendAck(seq, crc uint32) { _ = p.enqueueFrame(encodeAckFrame(p.localChannelID, seq, crc), true) } -func (p *streamTransport) resolveAck(channelID, seq, crc uint32) { +func (p *streamTransport) resolveAck(seq, crc uint32) { p.ackMu.Lock() waiter := p.ackWaiters[seq] p.ackMu.Unlock() @@ -565,10 +554,6 @@ func (p *streamTransport) resolveAck(channelID, seq, crc uint32) { return } - // The ACK matched a seq we're actually waiting for, so it came from our - // real partner; pin their channelID for downstream filtering. - p.pinPeerChannel(channelID) - select { case waiter <- crc: default: diff --git a/internal/transport/seichannel/transport_unit_test.go b/internal/transport/seichannel/transport_unit_test.go index b64f1d8..00abf58 100644 --- a/internal/transport/seichannel/transport_unit_test.go +++ b/internal/transport/seichannel/transport_unit_test.go @@ -162,7 +162,7 @@ func TestSendAckAndClosePaths(t *testing.T) { if err != nil { t.Fatalf("decodeTransportFrame() error = %v", err) } - tr.resolveAck(decoded.channelID, decoded.seq, crc32.ChecksumIEEE(payload)) + tr.resolveAck(decoded.seq, crc32.ChecksumIEEE(payload)) case <-time.After(time.Second): t.Fatal("Send() did not enqueue frame") } diff --git a/internal/transport/videochannel/transport.go b/internal/transport/videochannel/transport.go index 614fd7f..26ce8fa 100644 --- a/internal/transport/videochannel/transport.go +++ b/internal/transport/videochannel/transport.go @@ -547,33 +547,30 @@ func (p *streamTransport) handleFrame(frame []byte) { return } - // Multi-party MUCs (e.g. Jitsi) can deliver frames from other peers, or - // video echo from previously-closed sessions, to our PeerConnection. - // Once we've identified the real partner's channelID, drop everything - // else. We can't pin the partner from a raw frame header alone — a stray - // video frame might decode to a valid magic/version by chance — so the - // pin happens downstream, only after a CRC-validated payload (DATA) or a - // matching ACK waiter has confirmed the sender is ours. - if pinned := p.peerChannelID.Load(); pinned != 0 && decoded.channelID != pinned { + // Multi-party MUCs (e.g. Jitsi) can deliver frames from other peers — or + // video echo from previously-closed sessions — to our PeerConnection. + // The first valid frame we see fixes the peer's channelID; later frames + // with a different ID are silently dropped. + if !p.acceptChannel(decoded.channelID) { return } switch decoded.typ { case frameTypeAck: - p.resolveAck(decoded.channelID, decoded.seq, decoded.crc) + p.resolveAck(decoded.seq, decoded.crc) case frameTypeData: p.handleInboundFrame(decoded) } } -// pinPeerChannel commits the partner's channelID after a frame from them has -// been validated downstream. It's a one-shot CAS — later validated frames -// keep the same partner. id==0 is never accepted. -func (p *streamTransport) pinPeerChannel(id uint32) { +func (p *streamTransport) acceptChannel(id uint32) bool { if id == 0 { - return + return false } - p.peerChannelID.CompareAndSwap(0, id) + if p.peerChannelID.CompareAndSwap(0, id) { + return true + } + return p.peerChannelID.Load() == id } func (p *streamTransport) upsertInbound(frame transportFrame) (*inboundMessage, bool) { @@ -614,9 +611,6 @@ func (p *streamTransport) handleInboundFrame(frame transportFrame) { p.recvMu.Lock() if crc, ok := p.delivered[frame.seq]; ok && crc == frame.crc { p.recvMu.Unlock() - // Already-delivered duplicate: the peer is genuine (we accepted - // this seq earlier and CRC-matched it), so pin and re-ack. - p.pinPeerChannel(frame.channelID) p.sendAck(frame.seq, frame.crc) return } @@ -641,11 +635,6 @@ func (p *streamTransport) handleInboundFrame(frame transportFrame) { p.delivered[frame.seq] = msg.crc p.recvMu.Unlock() - // CRC validated end-to-end — this is our real partner. Pin their - // channelID so future stray frames from other MUC participants are - // dropped before reaching the reassembler. - p.pinPeerChannel(frame.channelID) - if p.onData != nil { p.onData(data) } @@ -656,7 +645,7 @@ func (p *streamTransport) sendAck(seq, crc uint32) { _ = p.enqueueFrame(encodeAckFrame(p.localChannelID, seq, crc), true) } -func (p *streamTransport) resolveAck(channelID, seq, crc uint32) { +func (p *streamTransport) resolveAck(seq, crc uint32) { p.ackMu.Lock() waiter := p.ackWaiters[seq] p.ackMu.Unlock() @@ -665,10 +654,6 @@ func (p *streamTransport) resolveAck(channelID, seq, crc uint32) { return } - // The ACK matched a seq we're actually waiting for, so it came from our - // real partner; pin their channelID for downstream filtering. - p.pinPeerChannel(channelID) - select { case waiter <- crc: default: diff --git a/internal/transport/videochannel/transport_unit_test.go b/internal/transport/videochannel/transport_unit_test.go index b0ae949..3a9357e 100644 --- a/internal/transport/videochannel/transport_unit_test.go +++ b/internal/transport/videochannel/transport_unit_test.go @@ -153,7 +153,7 @@ func TestSendAckAndClosePaths(t *testing.T) { if err != nil { t.Fatalf("decodeTransportFrame() error = %v", err) } - tr.resolveAck(decoded.channelID, decoded.seq, crc32.ChecksumIEEE(payload)) + tr.resolveAck(decoded.seq, crc32.ChecksumIEEE(payload)) case <-time.After(time.Second): t.Fatal("Send() did not enqueue frame") }