From 75e2674f48f35ba63d1527b049d397cbb30d9990 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Fri, 15 May 2026 22:47:32 +0300 Subject: [PATCH] fix(transport): isolate peer frames by channel id --- internal/e2e/tunnel_test.go | 25 ++- .../transport/seichannel/frame_extra_test.go | 5 +- internal/transport/seichannel/transport.go | 164 +++++++++++------- .../transport/seichannel/transport_test.go | 5 +- internal/transport/videochannel/frame.go | 51 +++--- .../videochannel/frame_extra_test.go | 5 +- internal/transport/videochannel/transport.go | 44 ++++- .../transport/videochannel/transport_test.go | 5 +- 8 files changed, 210 insertions(+), 94 deletions(-) diff --git a/internal/e2e/tunnel_test.go b/internal/e2e/tunnel_test.go index 205d6a7..d9e6764 100644 --- a/internal/e2e/tunnel_test.go +++ b/internal/e2e/tunnel_test.go @@ -4,7 +4,9 @@ import ( "bufio" "bytes" "context" + cryptorand "crypto/rand" "encoding/binary" + "encoding/hex" "errors" "flag" "fmt" @@ -533,6 +535,26 @@ func realRoomURL(ctx context.Context, t *testing.T, carrierName string) string { } } +// perSubtestRoomURL adds a fresh random suffix to the jitsi room slug for +// each subtest so subtests don't share a MUC — cross-subtest RTP echo from +// closed peer connections was leaking into the next subtest's transport and +// poisoning its handshake. Other carriers create real rooms server-side and +// already get unique ids per matrix entry, so they're left untouched. +func perSubtestRoomURL(carrierName, roomURL string) string { + if carrierName != "jitsi" { + return roomURL + } + var b [4]byte + suffix := fmt.Sprintf("%08x", time.Now().UnixNano()) + if _, err := cryptorand.Read(b[:]); err == nil { + suffix = hex.EncodeToString(b[:]) + } + if i := strings.LastIndex(roomURL, "/"); i >= 0 { + return roomURL[:i+1] + roomURL[i+1:] + "-" + suffix + } + return roomURL + "-" + suffix +} + func requireRealRoom(ctx context.Context, t *testing.T, carrierName string) string { t.Helper() @@ -1013,7 +1035,8 @@ func TestRealProviderTransportMatrix(t *testing.T) { } expectation := realE2ECaseExpectation(carrierName, transportName) label := realE2EExpectationLabel(expectation) - err := runRealE2ECase(t, carrierName, transportName, roomURL, echoAddr) + caseRoomURL := perSubtestRoomURL(carrierName, roomURL) + err := runRealE2ECase(t, carrierName, transportName, caseRoomURL, echoAddr) if err != nil && errors.Is(err, carrier.ErrAuthFailed) { authFailed = true t.Skipf("skip %s real e2e: auth failed: %v", carrierName, err) diff --git a/internal/transport/seichannel/frame_extra_test.go b/internal/transport/seichannel/frame_extra_test.go index 206e403..89127b4 100644 --- a/internal/transport/seichannel/frame_extra_test.go +++ b/internal/transport/seichannel/frame_extra_test.go @@ -42,11 +42,12 @@ func TestDecodeTransportFrameErrorsAndAck(t *testing.T) { } } - ack, err := decodeTransportFrame(encodeAckFrame(7, 0x1234)) + ack, err := decodeTransportFrame(encodeAckFrame(0xabcdef, 7, 0x1234)) if err != nil { t.Fatalf("decode ack error = %v", err) } - if ack.typ != frameTypeAck || ack.seq != 7 || ack.crc != 0x1234 { + if ack.typ != frameTypeAck || ack.channelID != 0xabcdef || + ack.seq != 7 || ack.crc != 0x1234 { t.Fatalf("ack = %+v", ack) } } diff --git a/internal/transport/seichannel/transport.go b/internal/transport/seichannel/transport.go index 6cb7f9b..50c1e8a 100644 --- a/internal/transport/seichannel/transport.go +++ b/internal/transport/seichannel/transport.go @@ -32,7 +32,7 @@ const ( maxSendAttempts = 4 sampleBuilderMaxLate = 128 protocolMagic uint32 = 0x4f564331 // OVC1 - protocolVersion byte = 1 + protocolVersion byte = 2 frameTypeData byte = 1 frameTypeAck byte = 2 ) @@ -60,6 +60,7 @@ var ( type transportFrame struct { typ byte + channelID uint32 seq uint32 crc uint32 totalLen uint32 @@ -76,27 +77,29 @@ type inboundMessage struct { } type streamTransport struct { - stream carrier.VideoTrack - track *webrtc.TrackLocalStaticSample - onData func([]byte) - outbound chan []byte - outboundAck chan []byte - closeCh chan struct{} - writerDone chan struct{} - nextSeq atomic.Uint32 - closed atomic.Bool - writerUp atomic.Bool - sendMu sync.Mutex - startWriter sync.Once - ackMu sync.Mutex - ackWaiters map[uint32]chan uint32 - recvMu sync.Mutex - inbound map[uint32]*inboundMessage - delivered map[uint32]uint32 - fragmentSize int - ackTimeout time.Duration - frameInterval time.Duration - batchSize int + stream carrier.VideoTrack + track *webrtc.TrackLocalStaticSample + onData func([]byte) + outbound chan []byte + outboundAck chan []byte + closeCh chan struct{} + writerDone chan struct{} + nextSeq atomic.Uint32 + closed atomic.Bool + writerUp atomic.Bool + localChannelID uint32 + peerChannelID atomic.Uint32 + sendMu sync.Mutex + startWriter sync.Once + ackMu sync.Mutex + ackWaiters map[uint32]chan uint32 + recvMu sync.Mutex + inbound map[uint32]*inboundMessage + delivered map[uint32]uint32 + fragmentSize int + ackTimeout time.Duration + frameInterval time.Duration + batchSize int } // New creates a seichannel transport backed by a carrier. @@ -160,20 +163,21 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) } tr := &streamTransport{ - stream: stream, - track: track, - onData: cfg.OnData, - outbound: make(chan []byte, 256), - outboundAck: make(chan []byte, 64), - closeCh: make(chan struct{}), - writerDone: make(chan struct{}), - ackWaiters: make(map[uint32]chan uint32), - inbound: make(map[uint32]*inboundMessage), - delivered: make(map[uint32]uint32), - fragmentSize: fragmentSize, - ackTimeout: ackTimeout, - frameInterval: time.Second / time.Duration(fps), - batchSize: batchSize, + stream: stream, + track: track, + onData: cfg.OnData, + outbound: make(chan []byte, 256), + outboundAck: make(chan []byte, 64), + closeCh: make(chan struct{}), + writerDone: make(chan struct{}), + localChannelID: newChannelID(), + ackWaiters: make(map[uint32]chan uint32), + inbound: make(map[uint32]*inboundMessage), + delivered: make(map[uint32]uint32), + fragmentSize: fragmentSize, + ackTimeout: ackTimeout, + frameInterval: time.Second / time.Duration(fps), + batchSize: batchSize, } err = stream.AddTrack(track) @@ -227,7 +231,7 @@ func (p *streamTransport) Send(data []byte) error { for range maxSendAttempts { for idx, fragment := range fragments { - frame := encodeDataFrame(seq, crc, len(data), idx, len(fragments), fragment) + frame := encodeDataFrame(p.localChannelID, seq, crc, len(data), idx, len(fragments), fragment) if err := p.enqueueFrame(frame, false); err != nil { return err } @@ -442,6 +446,14 @@ func (p *streamTransport) handleSample(sample []byte) { continue } + // Multi-party MUCs (e.g. Jitsi) can deliver frames from other + // peers — or RTP echo from previously-closed sessions — to our + // PeerConnection. The first valid frame we see fixes the peer's + // channelID; later frames with a different ID are silently dropped. + if !p.acceptChannel(frame.channelID) { + continue + } + switch frame.typ { case frameTypeAck: p.resolveAck(frame.seq, frame.crc) @@ -451,6 +463,16 @@ func (p *streamTransport) handleSample(sample []byte) { } } +func (p *streamTransport) acceptChannel(id uint32) bool { + if id == 0 { + return false + } + if p.peerChannelID.CompareAndSwap(0, id) { + return true + } + return p.peerChannelID.Load() == id +} + func (p *streamTransport) upsertInbound(frame transportFrame) (*inboundMessage, bool) { msg, ok := p.inbound[frame.seq] if !ok || msg.crc != frame.crc || msg.totalLen != frame.totalLen || len(msg.frags) != int(frame.fragTotal) { @@ -520,7 +542,7 @@ func (p *streamTransport) handleInboundFrame(frame transportFrame) { } func (p *streamTransport) sendAck(seq, crc uint32) { - _ = p.enqueueFrame(encodeAckFrame(seq, crc), true) + _ = p.enqueueFrame(encodeAckFrame(p.localChannelID, seq, crc), true) } func (p *streamTransport) resolveAck(seq, crc uint32) { @@ -555,27 +577,29 @@ func fragmentPayload(data []byte, maxSize int) [][]byte { return out } -func encodeDataFrame(seq, crc uint32, totalLen, fragIdx, fragTotal int, payload []byte) []byte { - out := make([]byte, 22+len(payload)) +func encodeDataFrame(channelID, seq, crc uint32, totalLen, fragIdx, fragTotal int, payload []byte) []byte { + out := make([]byte, 26+len(payload)) binary.BigEndian.PutUint32(out[0:4], protocolMagic) out[4] = protocolVersion out[5] = frameTypeData - binary.BigEndian.PutUint32(out[6:10], seq) - binary.BigEndian.PutUint32(out[10:14], crc) - binary.BigEndian.PutUint32(out[14:18], uint32(totalLen)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic - binary.BigEndian.PutUint16(out[18:20], uint16(fragIdx)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic - binary.BigEndian.PutUint16(out[20:22], uint16(fragTotal)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic - copy(out[22:], payload) + binary.BigEndian.PutUint32(out[6:10], channelID) + binary.BigEndian.PutUint32(out[10:14], seq) + binary.BigEndian.PutUint32(out[14:18], crc) + binary.BigEndian.PutUint32(out[18:22], uint32(totalLen)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic + binary.BigEndian.PutUint16(out[22:24], uint16(fragIdx)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic + binary.BigEndian.PutUint16(out[24:26], uint16(fragTotal)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic + copy(out[26:], payload) return out } -func encodeAckFrame(seq, crc uint32) []byte { - out := make([]byte, 14) +func encodeAckFrame(channelID, seq, crc uint32) []byte { + out := make([]byte, 18) binary.BigEndian.PutUint32(out[0:4], protocolMagic) out[4] = protocolVersion out[5] = frameTypeAck - binary.BigEndian.PutUint32(out[6:10], seq) - binary.BigEndian.PutUint32(out[10:14], crc) + binary.BigEndian.PutUint32(out[6:10], channelID) + binary.BigEndian.PutUint32(out[10:14], seq) + binary.BigEndian.PutUint32(out[14:18], crc) return out } @@ -593,22 +617,24 @@ func decodeTransportFrame(data []byte) (transportFrame, error) { frame := transportFrame{typ: data[5]} switch frame.typ { case frameTypeAck: - if len(data) < 14 { + if len(data) < 18 { return transportFrame{}, ErrAckTooShort } - frame.seq = binary.BigEndian.Uint32(data[6:10]) - frame.crc = binary.BigEndian.Uint32(data[10:14]) + frame.channelID = binary.BigEndian.Uint32(data[6:10]) + frame.seq = binary.BigEndian.Uint32(data[10:14]) + frame.crc = binary.BigEndian.Uint32(data[14:18]) return frame, nil case frameTypeData: - if len(data) < 22 { + if len(data) < 26 { return transportFrame{}, ErrDataTooShort } - frame.seq = binary.BigEndian.Uint32(data[6:10]) - frame.crc = binary.BigEndian.Uint32(data[10:14]) - frame.totalLen = binary.BigEndian.Uint32(data[14:18]) - frame.fragIdx = binary.BigEndian.Uint16(data[18:20]) - frame.fragTotal = binary.BigEndian.Uint16(data[20:22]) - frame.payload = append([]byte(nil), data[22:]...) + frame.channelID = binary.BigEndian.Uint32(data[6:10]) + frame.seq = binary.BigEndian.Uint32(data[10:14]) + frame.crc = binary.BigEndian.Uint32(data[14:18]) + frame.totalLen = binary.BigEndian.Uint32(data[18:22]) + frame.fragIdx = binary.BigEndian.Uint16(data[22:24]) + frame.fragTotal = binary.BigEndian.Uint16(data[24:26]) + frame.payload = append([]byte(nil), data[26:]...) return frame, nil default: return transportFrame{}, ErrUnexpectedFrameType @@ -625,3 +651,21 @@ func randomID() string { } return hex.EncodeToString(b[:]) } + +// newChannelID picks a non-zero random uint32 that tags every frame this +// peer emits. The receiving side pins the first non-zero channelID it sees +// and ignores frames carrying any other value, which is how we tell our +// real partner apart from other MUC participants and from leftover RTP +// echo of closed sessions. +func newChannelID() uint32 { + var b [4]byte + for { + if _, err := rand.Read(b[:]); err != nil { + return uint32(time.Now().UnixNano()) | 1 //nolint:gosec // G115: intentional truncation + } + id := binary.BigEndian.Uint32(b[:]) + if id != 0 { + return id + } + } +} diff --git a/internal/transport/seichannel/transport_test.go b/internal/transport/seichannel/transport_test.go index 8f11c6f..657977f 100644 --- a/internal/transport/seichannel/transport_test.go +++ b/internal/transport/seichannel/transport_test.go @@ -63,12 +63,13 @@ func TestSEIRoundTripThroughRTPPacketizerAndSampleBuilder(t *testing.T) { } func TestTransportFrameRoundTrip(t *testing.T) { - encoded := encodeDataFrame(42, 0xdeadbeef, 1024, 1, 3, []byte("chunk")) + encoded := encodeDataFrame(0xc0ffee, 42, 0xdeadbeef, 1024, 1, 3, []byte("chunk")) decoded, err := decodeTransportFrame(encoded) if err != nil { t.Fatalf("decodeTransportFrame failed: %v", err) } - if decoded.typ != frameTypeData || decoded.seq != 42 || decoded.crc != 0xdeadbeef { + if decoded.typ != frameTypeData || decoded.channelID != 0xc0ffee || + decoded.seq != 42 || decoded.crc != 0xdeadbeef { t.Fatalf("unexpected frame header: %+v", decoded) } if decoded.totalLen != 1024 || decoded.fragIdx != 1 || decoded.fragTotal != 3 { diff --git a/internal/transport/videochannel/frame.go b/internal/transport/videochannel/frame.go index 30233a8..5e6c329 100644 --- a/internal/transport/videochannel/frame.go +++ b/internal/transport/videochannel/frame.go @@ -7,7 +7,7 @@ import ( const ( protocolMagic uint32 = 0x4f565632 // OVV2 - protocolVersion byte = 1 + protocolVersion byte = 2 frameTypeData byte = 1 frameTypeAck byte = 2 ) @@ -29,6 +29,7 @@ var ( type transportFrame struct { typ byte + channelID uint32 seq uint32 crc uint32 totalLen uint32 @@ -64,27 +65,29 @@ func fragmentPayload(data []byte, maxSize int) [][]byte { return out } -func encodeDataFrame(seq, crc uint32, totalLen, fragIdx, fragTotal int, payload []byte) []byte { - out := make([]byte, 22+len(payload)) +func encodeDataFrame(channelID, seq, crc uint32, totalLen, fragIdx, fragTotal int, payload []byte) []byte { + out := make([]byte, 26+len(payload)) binary.BigEndian.PutUint32(out[0:4], protocolMagic) out[4] = protocolVersion out[5] = frameTypeData - binary.BigEndian.PutUint32(out[6:10], seq) - binary.BigEndian.PutUint32(out[10:14], crc) - binary.BigEndian.PutUint32(out[14:18], uint32(totalLen)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic - binary.BigEndian.PutUint16(out[18:20], uint16(fragIdx)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic - binary.BigEndian.PutUint16(out[20:22], uint16(fragTotal)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic - copy(out[22:], payload) + binary.BigEndian.PutUint32(out[6:10], channelID) + binary.BigEndian.PutUint32(out[10:14], seq) + binary.BigEndian.PutUint32(out[14:18], crc) + binary.BigEndian.PutUint32(out[18:22], uint32(totalLen)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic + binary.BigEndian.PutUint16(out[22:24], uint16(fragIdx)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic + binary.BigEndian.PutUint16(out[24:26], uint16(fragTotal)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic + copy(out[26:], payload) return out } -func encodeAckFrame(seq, crc uint32) []byte { - out := make([]byte, 14) +func encodeAckFrame(channelID, seq, crc uint32) []byte { + out := make([]byte, 18) binary.BigEndian.PutUint32(out[0:4], protocolMagic) out[4] = protocolVersion out[5] = frameTypeAck - binary.BigEndian.PutUint32(out[6:10], seq) - binary.BigEndian.PutUint32(out[10:14], crc) + binary.BigEndian.PutUint32(out[6:10], channelID) + binary.BigEndian.PutUint32(out[10:14], seq) + binary.BigEndian.PutUint32(out[14:18], crc) return out } @@ -102,22 +105,24 @@ func decodeTransportFrame(data []byte) (transportFrame, error) { frame := transportFrame{typ: data[5]} switch frame.typ { case frameTypeAck: - if len(data) < 14 { + if len(data) < 18 { return transportFrame{}, ErrAckTooShort } - frame.seq = binary.BigEndian.Uint32(data[6:10]) - frame.crc = binary.BigEndian.Uint32(data[10:14]) + frame.channelID = binary.BigEndian.Uint32(data[6:10]) + frame.seq = binary.BigEndian.Uint32(data[10:14]) + frame.crc = binary.BigEndian.Uint32(data[14:18]) return frame, nil case frameTypeData: - if len(data) < 22 { + if len(data) < 26 { return transportFrame{}, ErrDataTooShort } - frame.seq = binary.BigEndian.Uint32(data[6:10]) - frame.crc = binary.BigEndian.Uint32(data[10:14]) - frame.totalLen = binary.BigEndian.Uint32(data[14:18]) - frame.fragIdx = binary.BigEndian.Uint16(data[18:20]) - frame.fragTotal = binary.BigEndian.Uint16(data[20:22]) - frame.payload = append([]byte(nil), data[22:]...) + frame.channelID = binary.BigEndian.Uint32(data[6:10]) + frame.seq = binary.BigEndian.Uint32(data[10:14]) + frame.crc = binary.BigEndian.Uint32(data[14:18]) + frame.totalLen = binary.BigEndian.Uint32(data[18:22]) + frame.fragIdx = binary.BigEndian.Uint16(data[22:24]) + frame.fragTotal = binary.BigEndian.Uint16(data[24:26]) + frame.payload = append([]byte(nil), data[26:]...) return frame, nil default: return transportFrame{}, ErrUnexpectedFrameType diff --git a/internal/transport/videochannel/frame_extra_test.go b/internal/transport/videochannel/frame_extra_test.go index 075e1b1..80322e1 100644 --- a/internal/transport/videochannel/frame_extra_test.go +++ b/internal/transport/videochannel/frame_extra_test.go @@ -52,11 +52,12 @@ func TestDecodeTransportFrameErrorsAndAck(t *testing.T) { } } - ack, err := decodeTransportFrame(encodeAckFrame(7, 0x1234)) + ack, err := decodeTransportFrame(encodeAckFrame(0xabcdef, 7, 0x1234)) if err != nil { t.Fatalf("decode ack error = %v", err) } - if ack.typ != frameTypeAck || ack.seq != 7 || ack.crc != 0x1234 { + if ack.typ != frameTypeAck || ack.channelID != 0xabcdef || + ack.seq != 7 || ack.crc != 0x1234 { t.Fatalf("ack = %+v", ack) } } diff --git a/internal/transport/videochannel/transport.go b/internal/transport/videochannel/transport.go index 8468100..26ce8fa 100644 --- a/internal/transport/videochannel/transport.go +++ b/internal/transport/videochannel/transport.go @@ -4,6 +4,7 @@ package videochannel import ( "context" "crypto/rand" + "encoding/binary" "encoding/hex" "errors" "fmt" @@ -55,6 +56,8 @@ type streamTransport struct { nextSeq atomic.Uint32 closed atomic.Bool writerUp atomic.Bool + localChannelID uint32 + peerChannelID atomic.Uint32 sendMu sync.Mutex startWriter sync.Once ackMu sync.Mutex @@ -138,6 +141,7 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) outboundAck: make(chan []byte, 64), closeCh: make(chan struct{}), writerDone: make(chan struct{}), + localChannelID: newChannelID(), ackWaiters: make(map[uint32]chan uint32), inbound: make(map[uint32]*inboundMessage), delivered: make(map[uint32]uint32), @@ -222,7 +226,7 @@ func (p *streamTransport) Send(data []byte) error { for range maxSendAttempts { for idx, fragment := range fragments { - frame := encodeDataFrame(seq, crc, len(data), idx, len(fragments), fragment) + frame := encodeDataFrame(p.localChannelID, seq, crc, len(data), idx, len(fragments), fragment) if err := p.enqueueFrame(frame, false); err != nil { return err } @@ -543,6 +547,14 @@ func (p *streamTransport) handleFrame(frame []byte) { return } + // Multi-party MUCs (e.g. Jitsi) can deliver frames from other peers — or + // video echo from previously-closed sessions — to our PeerConnection. + // The first valid frame we see fixes the peer's channelID; later frames + // with a different ID are silently dropped. + if !p.acceptChannel(decoded.channelID) { + return + } + switch decoded.typ { case frameTypeAck: p.resolveAck(decoded.seq, decoded.crc) @@ -551,6 +563,16 @@ func (p *streamTransport) handleFrame(frame []byte) { } } +func (p *streamTransport) acceptChannel(id uint32) bool { + if id == 0 { + return false + } + if p.peerChannelID.CompareAndSwap(0, id) { + return true + } + return p.peerChannelID.Load() == id +} + func (p *streamTransport) upsertInbound(frame transportFrame) (*inboundMessage, bool) { msg, ok := p.inbound[frame.seq] if !ok || msg.crc != frame.crc || msg.totalLen != frame.totalLen || len(msg.frags) != int(frame.fragTotal) { @@ -620,7 +642,7 @@ func (p *streamTransport) handleInboundFrame(frame transportFrame) { } func (p *streamTransport) sendAck(seq, crc uint32) { - _ = p.enqueueFrame(encodeAckFrame(seq, crc), true) + _ = p.enqueueFrame(encodeAckFrame(p.localChannelID, seq, crc), true) } func (p *streamTransport) resolveAck(seq, crc uint32) { @@ -648,3 +670,21 @@ func randomID() string { } return hex.EncodeToString(b[:]) } + +// newChannelID picks a non-zero random uint32 that tags every frame this +// peer emits. The receiving side pins the first non-zero channelID it sees +// and ignores frames carrying any other value, which is how we tell our +// real partner apart from other MUC participants and from leftover video +// echo of closed sessions. +func newChannelID() uint32 { + var b [4]byte + for { + if _, err := rand.Read(b[:]); err != nil { + return uint32(time.Now().UnixNano()) | 1 //nolint:gosec // G115: intentional truncation + } + id := binary.BigEndian.Uint32(b[:]) + if id != 0 { + return id + } + } +} diff --git a/internal/transport/videochannel/transport_test.go b/internal/transport/videochannel/transport_test.go index 83e0f57..f87501b 100644 --- a/internal/transport/videochannel/transport_test.go +++ b/internal/transport/videochannel/transport_test.go @@ -62,12 +62,13 @@ func TestTileIdleFrameIgnored(t *testing.T) { } func TestTransportFrameRoundTrip(t *testing.T) { - encoded := encodeDataFrame(42, 0xdeadbeef, 1024, 1, 3, []byte("chunk")) + encoded := encodeDataFrame(0xc0ffee, 42, 0xdeadbeef, 1024, 1, 3, []byte("chunk")) decoded, err := decodeTransportFrame(encoded) if err != nil { t.Fatalf("decodeTransportFrame failed: %v", err) } - if decoded.typ != frameTypeData || decoded.seq != 42 || decoded.crc != 0xdeadbeef { + if decoded.typ != frameTypeData || decoded.channelID != 0xc0ffee || + decoded.seq != 42 || decoded.crc != 0xdeadbeef { t.Fatalf("unexpected frame header: %+v", decoded) } if decoded.totalLen != 1024 || decoded.fragIdx != 1 || decoded.fragTotal != 3 {