fix(transport): isolate peer frames by channel id

This commit is contained in:
zarazaex69
2026-05-15 22:47:32 +03:00
parent 86193f4749
commit 75e2674f48
8 changed files with 210 additions and 94 deletions

View File

@@ -4,7 +4,9 @@ import (
"bufio"
"bytes"
"context"
cryptorand "crypto/rand"
"encoding/binary"
"encoding/hex"
"errors"
"flag"
"fmt"
@@ -533,6 +535,26 @@ func realRoomURL(ctx context.Context, t *testing.T, carrierName string) string {
}
}
// perSubtestRoomURL adds a fresh random suffix to the jitsi room slug for
// each subtest so subtests don't share a MUC — cross-subtest RTP echo from
// closed peer connections was leaking into the next subtest's transport and
// poisoning its handshake. Other carriers create real rooms server-side and
// already get unique ids per matrix entry, so they're left untouched.
func perSubtestRoomURL(carrierName, roomURL string) string {
if carrierName != "jitsi" {
return roomURL
}
var b [4]byte
suffix := fmt.Sprintf("%08x", time.Now().UnixNano())
if _, err := cryptorand.Read(b[:]); err == nil {
suffix = hex.EncodeToString(b[:])
}
if i := strings.LastIndex(roomURL, "/"); i >= 0 {
return roomURL[:i+1] + roomURL[i+1:] + "-" + suffix
}
return roomURL + "-" + suffix
}
func requireRealRoom(ctx context.Context, t *testing.T, carrierName string) string {
t.Helper()
@@ -1013,7 +1035,8 @@ func TestRealProviderTransportMatrix(t *testing.T) {
}
expectation := realE2ECaseExpectation(carrierName, transportName)
label := realE2EExpectationLabel(expectation)
err := runRealE2ECase(t, carrierName, transportName, roomURL, echoAddr)
caseRoomURL := perSubtestRoomURL(carrierName, roomURL)
err := runRealE2ECase(t, carrierName, transportName, caseRoomURL, echoAddr)
if err != nil && errors.Is(err, carrier.ErrAuthFailed) {
authFailed = true
t.Skipf("skip %s real e2e: auth failed: %v", carrierName, err)

View File

@@ -42,11 +42,12 @@ func TestDecodeTransportFrameErrorsAndAck(t *testing.T) {
}
}
ack, err := decodeTransportFrame(encodeAckFrame(7, 0x1234))
ack, err := decodeTransportFrame(encodeAckFrame(0xabcdef, 7, 0x1234))
if err != nil {
t.Fatalf("decode ack error = %v", err)
}
if ack.typ != frameTypeAck || ack.seq != 7 || ack.crc != 0x1234 {
if ack.typ != frameTypeAck || ack.channelID != 0xabcdef ||
ack.seq != 7 || ack.crc != 0x1234 {
t.Fatalf("ack = %+v", ack)
}
}

View File

@@ -32,7 +32,7 @@ const (
maxSendAttempts = 4
sampleBuilderMaxLate = 128
protocolMagic uint32 = 0x4f564331 // OVC1
protocolVersion byte = 1
protocolVersion byte = 2
frameTypeData byte = 1
frameTypeAck byte = 2
)
@@ -60,6 +60,7 @@ var (
type transportFrame struct {
typ byte
channelID uint32
seq uint32
crc uint32
totalLen uint32
@@ -76,27 +77,29 @@ type inboundMessage struct {
}
type streamTransport struct {
stream carrier.VideoTrack
track *webrtc.TrackLocalStaticSample
onData func([]byte)
outbound chan []byte
outboundAck chan []byte
closeCh chan struct{}
writerDone chan struct{}
nextSeq atomic.Uint32
closed atomic.Bool
writerUp atomic.Bool
sendMu sync.Mutex
startWriter sync.Once
ackMu sync.Mutex
ackWaiters map[uint32]chan uint32
recvMu sync.Mutex
inbound map[uint32]*inboundMessage
delivered map[uint32]uint32
fragmentSize int
ackTimeout time.Duration
frameInterval time.Duration
batchSize int
stream carrier.VideoTrack
track *webrtc.TrackLocalStaticSample
onData func([]byte)
outbound chan []byte
outboundAck chan []byte
closeCh chan struct{}
writerDone chan struct{}
nextSeq atomic.Uint32
closed atomic.Bool
writerUp atomic.Bool
localChannelID uint32
peerChannelID atomic.Uint32
sendMu sync.Mutex
startWriter sync.Once
ackMu sync.Mutex
ackWaiters map[uint32]chan uint32
recvMu sync.Mutex
inbound map[uint32]*inboundMessage
delivered map[uint32]uint32
fragmentSize int
ackTimeout time.Duration
frameInterval time.Duration
batchSize int
}
// New creates a seichannel transport backed by a carrier.
@@ -160,20 +163,21 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error)
}
tr := &streamTransport{
stream: stream,
track: track,
onData: cfg.OnData,
outbound: make(chan []byte, 256),
outboundAck: make(chan []byte, 64),
closeCh: make(chan struct{}),
writerDone: make(chan struct{}),
ackWaiters: make(map[uint32]chan uint32),
inbound: make(map[uint32]*inboundMessage),
delivered: make(map[uint32]uint32),
fragmentSize: fragmentSize,
ackTimeout: ackTimeout,
frameInterval: time.Second / time.Duration(fps),
batchSize: batchSize,
stream: stream,
track: track,
onData: cfg.OnData,
outbound: make(chan []byte, 256),
outboundAck: make(chan []byte, 64),
closeCh: make(chan struct{}),
writerDone: make(chan struct{}),
localChannelID: newChannelID(),
ackWaiters: make(map[uint32]chan uint32),
inbound: make(map[uint32]*inboundMessage),
delivered: make(map[uint32]uint32),
fragmentSize: fragmentSize,
ackTimeout: ackTimeout,
frameInterval: time.Second / time.Duration(fps),
batchSize: batchSize,
}
err = stream.AddTrack(track)
@@ -227,7 +231,7 @@ func (p *streamTransport) Send(data []byte) error {
for range maxSendAttempts {
for idx, fragment := range fragments {
frame := encodeDataFrame(seq, crc, len(data), idx, len(fragments), fragment)
frame := encodeDataFrame(p.localChannelID, seq, crc, len(data), idx, len(fragments), fragment)
if err := p.enqueueFrame(frame, false); err != nil {
return err
}
@@ -442,6 +446,14 @@ func (p *streamTransport) handleSample(sample []byte) {
continue
}
// Multi-party MUCs (e.g. Jitsi) can deliver frames from other
// peers — or RTP echo from previously-closed sessions — to our
// PeerConnection. The first valid frame we see fixes the peer's
// channelID; later frames with a different ID are silently dropped.
if !p.acceptChannel(frame.channelID) {
continue
}
switch frame.typ {
case frameTypeAck:
p.resolveAck(frame.seq, frame.crc)
@@ -451,6 +463,16 @@ func (p *streamTransport) handleSample(sample []byte) {
}
}
func (p *streamTransport) acceptChannel(id uint32) bool {
if id == 0 {
return false
}
if p.peerChannelID.CompareAndSwap(0, id) {
return true
}
return p.peerChannelID.Load() == id
}
func (p *streamTransport) upsertInbound(frame transportFrame) (*inboundMessage, bool) {
msg, ok := p.inbound[frame.seq]
if !ok || msg.crc != frame.crc || msg.totalLen != frame.totalLen || len(msg.frags) != int(frame.fragTotal) {
@@ -520,7 +542,7 @@ func (p *streamTransport) handleInboundFrame(frame transportFrame) {
}
func (p *streamTransport) sendAck(seq, crc uint32) {
_ = p.enqueueFrame(encodeAckFrame(seq, crc), true)
_ = p.enqueueFrame(encodeAckFrame(p.localChannelID, seq, crc), true)
}
func (p *streamTransport) resolveAck(seq, crc uint32) {
@@ -555,27 +577,29 @@ func fragmentPayload(data []byte, maxSize int) [][]byte {
return out
}
func encodeDataFrame(seq, crc uint32, totalLen, fragIdx, fragTotal int, payload []byte) []byte {
out := make([]byte, 22+len(payload))
func encodeDataFrame(channelID, seq, crc uint32, totalLen, fragIdx, fragTotal int, payload []byte) []byte {
out := make([]byte, 26+len(payload))
binary.BigEndian.PutUint32(out[0:4], protocolMagic)
out[4] = protocolVersion
out[5] = frameTypeData
binary.BigEndian.PutUint32(out[6:10], seq)
binary.BigEndian.PutUint32(out[10:14], crc)
binary.BigEndian.PutUint32(out[14:18], uint32(totalLen)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
binary.BigEndian.PutUint16(out[18:20], uint16(fragIdx)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
binary.BigEndian.PutUint16(out[20:22], uint16(fragTotal)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
copy(out[22:], payload)
binary.BigEndian.PutUint32(out[6:10], channelID)
binary.BigEndian.PutUint32(out[10:14], seq)
binary.BigEndian.PutUint32(out[14:18], crc)
binary.BigEndian.PutUint32(out[18:22], uint32(totalLen)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
binary.BigEndian.PutUint16(out[22:24], uint16(fragIdx)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
binary.BigEndian.PutUint16(out[24:26], uint16(fragTotal)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
copy(out[26:], payload)
return out
}
func encodeAckFrame(seq, crc uint32) []byte {
out := make([]byte, 14)
func encodeAckFrame(channelID, seq, crc uint32) []byte {
out := make([]byte, 18)
binary.BigEndian.PutUint32(out[0:4], protocolMagic)
out[4] = protocolVersion
out[5] = frameTypeAck
binary.BigEndian.PutUint32(out[6:10], seq)
binary.BigEndian.PutUint32(out[10:14], crc)
binary.BigEndian.PutUint32(out[6:10], channelID)
binary.BigEndian.PutUint32(out[10:14], seq)
binary.BigEndian.PutUint32(out[14:18], crc)
return out
}
@@ -593,22 +617,24 @@ func decodeTransportFrame(data []byte) (transportFrame, error) {
frame := transportFrame{typ: data[5]}
switch frame.typ {
case frameTypeAck:
if len(data) < 14 {
if len(data) < 18 {
return transportFrame{}, ErrAckTooShort
}
frame.seq = binary.BigEndian.Uint32(data[6:10])
frame.crc = binary.BigEndian.Uint32(data[10:14])
frame.channelID = binary.BigEndian.Uint32(data[6:10])
frame.seq = binary.BigEndian.Uint32(data[10:14])
frame.crc = binary.BigEndian.Uint32(data[14:18])
return frame, nil
case frameTypeData:
if len(data) < 22 {
if len(data) < 26 {
return transportFrame{}, ErrDataTooShort
}
frame.seq = binary.BigEndian.Uint32(data[6:10])
frame.crc = binary.BigEndian.Uint32(data[10:14])
frame.totalLen = binary.BigEndian.Uint32(data[14:18])
frame.fragIdx = binary.BigEndian.Uint16(data[18:20])
frame.fragTotal = binary.BigEndian.Uint16(data[20:22])
frame.payload = append([]byte(nil), data[22:]...)
frame.channelID = binary.BigEndian.Uint32(data[6:10])
frame.seq = binary.BigEndian.Uint32(data[10:14])
frame.crc = binary.BigEndian.Uint32(data[14:18])
frame.totalLen = binary.BigEndian.Uint32(data[18:22])
frame.fragIdx = binary.BigEndian.Uint16(data[22:24])
frame.fragTotal = binary.BigEndian.Uint16(data[24:26])
frame.payload = append([]byte(nil), data[26:]...)
return frame, nil
default:
return transportFrame{}, ErrUnexpectedFrameType
@@ -625,3 +651,21 @@ func randomID() string {
}
return hex.EncodeToString(b[:])
}
// newChannelID picks a non-zero random uint32 that tags every frame this
// peer emits. The receiving side pins the first non-zero channelID it sees
// and ignores frames carrying any other value, which is how we tell our
// real partner apart from other MUC participants and from leftover RTP
// echo of closed sessions.
func newChannelID() uint32 {
var b [4]byte
for {
if _, err := rand.Read(b[:]); err != nil {
return uint32(time.Now().UnixNano()) | 1 //nolint:gosec // G115: intentional truncation
}
id := binary.BigEndian.Uint32(b[:])
if id != 0 {
return id
}
}
}

View File

@@ -63,12 +63,13 @@ func TestSEIRoundTripThroughRTPPacketizerAndSampleBuilder(t *testing.T) {
}
func TestTransportFrameRoundTrip(t *testing.T) {
encoded := encodeDataFrame(42, 0xdeadbeef, 1024, 1, 3, []byte("chunk"))
encoded := encodeDataFrame(0xc0ffee, 42, 0xdeadbeef, 1024, 1, 3, []byte("chunk"))
decoded, err := decodeTransportFrame(encoded)
if err != nil {
t.Fatalf("decodeTransportFrame failed: %v", err)
}
if decoded.typ != frameTypeData || decoded.seq != 42 || decoded.crc != 0xdeadbeef {
if decoded.typ != frameTypeData || decoded.channelID != 0xc0ffee ||
decoded.seq != 42 || decoded.crc != 0xdeadbeef {
t.Fatalf("unexpected frame header: %+v", decoded)
}
if decoded.totalLen != 1024 || decoded.fragIdx != 1 || decoded.fragTotal != 3 {

View File

@@ -7,7 +7,7 @@ import (
const (
protocolMagic uint32 = 0x4f565632 // OVV2
protocolVersion byte = 1
protocolVersion byte = 2
frameTypeData byte = 1
frameTypeAck byte = 2
)
@@ -29,6 +29,7 @@ var (
type transportFrame struct {
typ byte
channelID uint32
seq uint32
crc uint32
totalLen uint32
@@ -64,27 +65,29 @@ func fragmentPayload(data []byte, maxSize int) [][]byte {
return out
}
func encodeDataFrame(seq, crc uint32, totalLen, fragIdx, fragTotal int, payload []byte) []byte {
out := make([]byte, 22+len(payload))
func encodeDataFrame(channelID, seq, crc uint32, totalLen, fragIdx, fragTotal int, payload []byte) []byte {
out := make([]byte, 26+len(payload))
binary.BigEndian.PutUint32(out[0:4], protocolMagic)
out[4] = protocolVersion
out[5] = frameTypeData
binary.BigEndian.PutUint32(out[6:10], seq)
binary.BigEndian.PutUint32(out[10:14], crc)
binary.BigEndian.PutUint32(out[14:18], uint32(totalLen)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
binary.BigEndian.PutUint16(out[18:20], uint16(fragIdx)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
binary.BigEndian.PutUint16(out[20:22], uint16(fragTotal)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
copy(out[22:], payload)
binary.BigEndian.PutUint32(out[6:10], channelID)
binary.BigEndian.PutUint32(out[10:14], seq)
binary.BigEndian.PutUint32(out[14:18], crc)
binary.BigEndian.PutUint32(out[18:22], uint32(totalLen)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
binary.BigEndian.PutUint16(out[22:24], uint16(fragIdx)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
binary.BigEndian.PutUint16(out[24:26], uint16(fragTotal)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
copy(out[26:], payload)
return out
}
func encodeAckFrame(seq, crc uint32) []byte {
out := make([]byte, 14)
func encodeAckFrame(channelID, seq, crc uint32) []byte {
out := make([]byte, 18)
binary.BigEndian.PutUint32(out[0:4], protocolMagic)
out[4] = protocolVersion
out[5] = frameTypeAck
binary.BigEndian.PutUint32(out[6:10], seq)
binary.BigEndian.PutUint32(out[10:14], crc)
binary.BigEndian.PutUint32(out[6:10], channelID)
binary.BigEndian.PutUint32(out[10:14], seq)
binary.BigEndian.PutUint32(out[14:18], crc)
return out
}
@@ -102,22 +105,24 @@ func decodeTransportFrame(data []byte) (transportFrame, error) {
frame := transportFrame{typ: data[5]}
switch frame.typ {
case frameTypeAck:
if len(data) < 14 {
if len(data) < 18 {
return transportFrame{}, ErrAckTooShort
}
frame.seq = binary.BigEndian.Uint32(data[6:10])
frame.crc = binary.BigEndian.Uint32(data[10:14])
frame.channelID = binary.BigEndian.Uint32(data[6:10])
frame.seq = binary.BigEndian.Uint32(data[10:14])
frame.crc = binary.BigEndian.Uint32(data[14:18])
return frame, nil
case frameTypeData:
if len(data) < 22 {
if len(data) < 26 {
return transportFrame{}, ErrDataTooShort
}
frame.seq = binary.BigEndian.Uint32(data[6:10])
frame.crc = binary.BigEndian.Uint32(data[10:14])
frame.totalLen = binary.BigEndian.Uint32(data[14:18])
frame.fragIdx = binary.BigEndian.Uint16(data[18:20])
frame.fragTotal = binary.BigEndian.Uint16(data[20:22])
frame.payload = append([]byte(nil), data[22:]...)
frame.channelID = binary.BigEndian.Uint32(data[6:10])
frame.seq = binary.BigEndian.Uint32(data[10:14])
frame.crc = binary.BigEndian.Uint32(data[14:18])
frame.totalLen = binary.BigEndian.Uint32(data[18:22])
frame.fragIdx = binary.BigEndian.Uint16(data[22:24])
frame.fragTotal = binary.BigEndian.Uint16(data[24:26])
frame.payload = append([]byte(nil), data[26:]...)
return frame, nil
default:
return transportFrame{}, ErrUnexpectedFrameType

View File

@@ -52,11 +52,12 @@ func TestDecodeTransportFrameErrorsAndAck(t *testing.T) {
}
}
ack, err := decodeTransportFrame(encodeAckFrame(7, 0x1234))
ack, err := decodeTransportFrame(encodeAckFrame(0xabcdef, 7, 0x1234))
if err != nil {
t.Fatalf("decode ack error = %v", err)
}
if ack.typ != frameTypeAck || ack.seq != 7 || ack.crc != 0x1234 {
if ack.typ != frameTypeAck || ack.channelID != 0xabcdef ||
ack.seq != 7 || ack.crc != 0x1234 {
t.Fatalf("ack = %+v", ack)
}
}

View File

@@ -4,6 +4,7 @@ package videochannel
import (
"context"
"crypto/rand"
"encoding/binary"
"encoding/hex"
"errors"
"fmt"
@@ -55,6 +56,8 @@ type streamTransport struct {
nextSeq atomic.Uint32
closed atomic.Bool
writerUp atomic.Bool
localChannelID uint32
peerChannelID atomic.Uint32
sendMu sync.Mutex
startWriter sync.Once
ackMu sync.Mutex
@@ -138,6 +141,7 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error)
outboundAck: make(chan []byte, 64),
closeCh: make(chan struct{}),
writerDone: make(chan struct{}),
localChannelID: newChannelID(),
ackWaiters: make(map[uint32]chan uint32),
inbound: make(map[uint32]*inboundMessage),
delivered: make(map[uint32]uint32),
@@ -222,7 +226,7 @@ func (p *streamTransport) Send(data []byte) error {
for range maxSendAttempts {
for idx, fragment := range fragments {
frame := encodeDataFrame(seq, crc, len(data), idx, len(fragments), fragment)
frame := encodeDataFrame(p.localChannelID, seq, crc, len(data), idx, len(fragments), fragment)
if err := p.enqueueFrame(frame, false); err != nil {
return err
}
@@ -543,6 +547,14 @@ func (p *streamTransport) handleFrame(frame []byte) {
return
}
// Multi-party MUCs (e.g. Jitsi) can deliver frames from other peers — or
// video echo from previously-closed sessions — to our PeerConnection.
// The first valid frame we see fixes the peer's channelID; later frames
// with a different ID are silently dropped.
if !p.acceptChannel(decoded.channelID) {
return
}
switch decoded.typ {
case frameTypeAck:
p.resolveAck(decoded.seq, decoded.crc)
@@ -551,6 +563,16 @@ func (p *streamTransport) handleFrame(frame []byte) {
}
}
func (p *streamTransport) acceptChannel(id uint32) bool {
if id == 0 {
return false
}
if p.peerChannelID.CompareAndSwap(0, id) {
return true
}
return p.peerChannelID.Load() == id
}
func (p *streamTransport) upsertInbound(frame transportFrame) (*inboundMessage, bool) {
msg, ok := p.inbound[frame.seq]
if !ok || msg.crc != frame.crc || msg.totalLen != frame.totalLen || len(msg.frags) != int(frame.fragTotal) {
@@ -620,7 +642,7 @@ func (p *streamTransport) handleInboundFrame(frame transportFrame) {
}
func (p *streamTransport) sendAck(seq, crc uint32) {
_ = p.enqueueFrame(encodeAckFrame(seq, crc), true)
_ = p.enqueueFrame(encodeAckFrame(p.localChannelID, seq, crc), true)
}
func (p *streamTransport) resolveAck(seq, crc uint32) {
@@ -648,3 +670,21 @@ func randomID() string {
}
return hex.EncodeToString(b[:])
}
// newChannelID picks a non-zero random uint32 that tags every frame this
// peer emits. The receiving side pins the first non-zero channelID it sees
// and ignores frames carrying any other value, which is how we tell our
// real partner apart from other MUC participants and from leftover video
// echo of closed sessions.
func newChannelID() uint32 {
var b [4]byte
for {
if _, err := rand.Read(b[:]); err != nil {
return uint32(time.Now().UnixNano()) | 1 //nolint:gosec // G115: intentional truncation
}
id := binary.BigEndian.Uint32(b[:])
if id != 0 {
return id
}
}
}

View File

@@ -62,12 +62,13 @@ func TestTileIdleFrameIgnored(t *testing.T) {
}
func TestTransportFrameRoundTrip(t *testing.T) {
encoded := encodeDataFrame(42, 0xdeadbeef, 1024, 1, 3, []byte("chunk"))
encoded := encodeDataFrame(0xc0ffee, 42, 0xdeadbeef, 1024, 1, 3, []byte("chunk"))
decoded, err := decodeTransportFrame(encoded)
if err != nil {
t.Fatalf("decodeTransportFrame failed: %v", err)
}
if decoded.typ != frameTypeData || decoded.seq != 42 || decoded.crc != 0xdeadbeef {
if decoded.typ != frameTypeData || decoded.channelID != 0xc0ffee ||
decoded.seq != 42 || decoded.crc != 0xdeadbeef {
t.Fatalf("unexpected frame header: %+v", decoded)
}
if decoded.totalLen != 1024 || decoded.fragIdx != 1 || decoded.fragTotal != 3 {