Revert "fix(transport): pin peer channel after validation"

This reverts commit 7f9351dad6.
This commit is contained in:
zarazaex69
2026-05-15 23:09:24 +03:00
parent 7f9351dad6
commit a9512d2488
4 changed files with 28 additions and 58 deletions

View File

@@ -446,34 +446,31 @@ func (p *streamTransport) handleSample(sample []byte) {
continue
}
// Multi-party MUCs (e.g. Jitsi) can deliver frames from other peers,
// or RTP echo from previously-closed sessions, to our PeerConnection.
// Once we've identified the real partner's channelID, drop everything
// else. We can't pin the partner from a raw frame header alone — a
// stray RTP packet might decode to a valid magic/version by chance —
// so the pin happens downstream, only after a CRC-validated payload
// (DATA) or a matching ACK waiter has confirmed the sender is ours.
if pinned := p.peerChannelID.Load(); pinned != 0 && frame.channelID != pinned {
// Multi-party MUCs (e.g. Jitsi) can deliver frames from other
// peers — or RTP echo from previously-closed sessions to our
// PeerConnection. The first valid frame we see fixes the peer's
// channelID; later frames with a different ID are silently dropped.
if !p.acceptChannel(frame.channelID) {
continue
}
switch frame.typ {
case frameTypeAck:
p.resolveAck(frame.channelID, frame.seq, frame.crc)
p.resolveAck(frame.seq, frame.crc)
case frameTypeData:
p.handleInboundFrame(frame)
}
}
}
// pinPeerChannel commits the partner's channelID after a frame from them has
// been validated downstream. It's a one-shot CAS — later validated frames
// keep the same partner. id==0 is never accepted.
func (p *streamTransport) pinPeerChannel(id uint32) {
func (p *streamTransport) acceptChannel(id uint32) bool {
if id == 0 {
return
return false
}
p.peerChannelID.CompareAndSwap(0, id)
if p.peerChannelID.CompareAndSwap(0, id) {
return true
}
return p.peerChannelID.Load() == id
}
func (p *streamTransport) upsertInbound(frame transportFrame) (*inboundMessage, bool) {
@@ -514,9 +511,6 @@ func (p *streamTransport) handleInboundFrame(frame transportFrame) {
p.recvMu.Lock()
if crc, ok := p.delivered[frame.seq]; ok && crc == frame.crc {
p.recvMu.Unlock()
// Already-delivered duplicate: the peer is genuine (we accepted
// this seq earlier and CRC-matched it), so pin and re-ack.
p.pinPeerChannel(frame.channelID)
p.sendAck(frame.seq, frame.crc)
return
}
@@ -541,11 +535,6 @@ func (p *streamTransport) handleInboundFrame(frame transportFrame) {
p.delivered[frame.seq] = msg.crc
p.recvMu.Unlock()
// CRC validated end-to-end — this is our real partner. Pin their
// channelID so future stray frames from other MUC participants are
// dropped before reaching the reassembler.
p.pinPeerChannel(frame.channelID)
if p.onData != nil {
p.onData(data)
}
@@ -556,7 +545,7 @@ func (p *streamTransport) sendAck(seq, crc uint32) {
_ = p.enqueueFrame(encodeAckFrame(p.localChannelID, seq, crc), true)
}
func (p *streamTransport) resolveAck(channelID, seq, crc uint32) {
func (p *streamTransport) resolveAck(seq, crc uint32) {
p.ackMu.Lock()
waiter := p.ackWaiters[seq]
p.ackMu.Unlock()
@@ -565,10 +554,6 @@ func (p *streamTransport) resolveAck(channelID, seq, crc uint32) {
return
}
// The ACK matched a seq we're actually waiting for, so it came from our
// real partner; pin their channelID for downstream filtering.
p.pinPeerChannel(channelID)
select {
case waiter <- crc:
default:

View File

@@ -162,7 +162,7 @@ func TestSendAckAndClosePaths(t *testing.T) {
if err != nil {
t.Fatalf("decodeTransportFrame() error = %v", err)
}
tr.resolveAck(decoded.channelID, decoded.seq, crc32.ChecksumIEEE(payload))
tr.resolveAck(decoded.seq, crc32.ChecksumIEEE(payload))
case <-time.After(time.Second):
t.Fatal("Send() did not enqueue frame")
}

View File

@@ -547,33 +547,30 @@ func (p *streamTransport) handleFrame(frame []byte) {
return
}
// Multi-party MUCs (e.g. Jitsi) can deliver frames from other peers, or
// video echo from previously-closed sessions, to our PeerConnection.
// Once we've identified the real partner's channelID, drop everything
// else. We can't pin the partner from a raw frame header alone — a stray
// video frame might decode to a valid magic/version by chance — so the
// pin happens downstream, only after a CRC-validated payload (DATA) or a
// matching ACK waiter has confirmed the sender is ours.
if pinned := p.peerChannelID.Load(); pinned != 0 && decoded.channelID != pinned {
// Multi-party MUCs (e.g. Jitsi) can deliver frames from other peers or
// video echo from previously-closed sessions to our PeerConnection.
// The first valid frame we see fixes the peer's channelID; later frames
// with a different ID are silently dropped.
if !p.acceptChannel(decoded.channelID) {
return
}
switch decoded.typ {
case frameTypeAck:
p.resolveAck(decoded.channelID, decoded.seq, decoded.crc)
p.resolveAck(decoded.seq, decoded.crc)
case frameTypeData:
p.handleInboundFrame(decoded)
}
}
// pinPeerChannel commits the partner's channelID after a frame from them has
// been validated downstream. It's a one-shot CAS — later validated frames
// keep the same partner. id==0 is never accepted.
func (p *streamTransport) pinPeerChannel(id uint32) {
func (p *streamTransport) acceptChannel(id uint32) bool {
if id == 0 {
return
return false
}
p.peerChannelID.CompareAndSwap(0, id)
if p.peerChannelID.CompareAndSwap(0, id) {
return true
}
return p.peerChannelID.Load() == id
}
func (p *streamTransport) upsertInbound(frame transportFrame) (*inboundMessage, bool) {
@@ -614,9 +611,6 @@ func (p *streamTransport) handleInboundFrame(frame transportFrame) {
p.recvMu.Lock()
if crc, ok := p.delivered[frame.seq]; ok && crc == frame.crc {
p.recvMu.Unlock()
// Already-delivered duplicate: the peer is genuine (we accepted
// this seq earlier and CRC-matched it), so pin and re-ack.
p.pinPeerChannel(frame.channelID)
p.sendAck(frame.seq, frame.crc)
return
}
@@ -641,11 +635,6 @@ func (p *streamTransport) handleInboundFrame(frame transportFrame) {
p.delivered[frame.seq] = msg.crc
p.recvMu.Unlock()
// CRC validated end-to-end — this is our real partner. Pin their
// channelID so future stray frames from other MUC participants are
// dropped before reaching the reassembler.
p.pinPeerChannel(frame.channelID)
if p.onData != nil {
p.onData(data)
}
@@ -656,7 +645,7 @@ func (p *streamTransport) sendAck(seq, crc uint32) {
_ = p.enqueueFrame(encodeAckFrame(p.localChannelID, seq, crc), true)
}
func (p *streamTransport) resolveAck(channelID, seq, crc uint32) {
func (p *streamTransport) resolveAck(seq, crc uint32) {
p.ackMu.Lock()
waiter := p.ackWaiters[seq]
p.ackMu.Unlock()
@@ -665,10 +654,6 @@ func (p *streamTransport) resolveAck(channelID, seq, crc uint32) {
return
}
// The ACK matched a seq we're actually waiting for, so it came from our
// real partner; pin their channelID for downstream filtering.
p.pinPeerChannel(channelID)
select {
case waiter <- crc:
default:

View File

@@ -153,7 +153,7 @@ func TestSendAckAndClosePaths(t *testing.T) {
if err != nil {
t.Fatalf("decodeTransportFrame() error = %v", err)
}
tr.resolveAck(decoded.channelID, decoded.seq, crc32.ChecksumIEEE(payload))
tr.resolveAck(decoded.seq, crc32.ChecksumIEEE(payload))
case <-time.After(time.Second):
t.Fatal("Send() did not enqueue frame")
}