From 7e810acd34b0e17b27360d155b4baa40e6169ba3 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Thu, 7 May 2026 20:43:32 +0300 Subject: [PATCH] upd: trandport go fix maybe --- internal/transport/vp8channel/transport.go | 83 ++++++++++++++----- .../vp8channel/transport_unit_test.go | 16 +++- 2 files changed, 78 insertions(+), 21 deletions(-) diff --git a/internal/transport/vp8channel/transport.go b/internal/transport/vp8channel/transport.go index f0c314c..6b11290 100644 --- a/internal/transport/vp8channel/transport.go +++ b/internal/transport/vp8channel/transport.go @@ -1,3 +1,5 @@ + + package vp8channel import ( @@ -12,6 +14,7 @@ import ( "time" "github.com/openlibrecommunity/olcrtc/internal/carrier" + "github.com/openlibrecommunity/olcrtc/internal/logger" "github.com/openlibrecommunity/olcrtc/internal/transport" "github.com/pion/rtp" "github.com/pion/rtp/codecs" @@ -166,24 +169,7 @@ func (p *streamTransport) Connect(ctx context.Context) error { go p.writerLoop() }) - // Start KCP immediately. Don't wait for the peer's video track: - // the server may legitimately come up before any client joins the - // room, and KCP itself does not need a handshake. Once the peer - // shows up, handleRemoteTrack starts pumping their RTP into our - // session and the epoch-change detector handles peer restarts. - var startErr error - p.kcpOnce.Do(func() { - rt, err := startKCP(p.outbound, p.onData, p.epochHeader()) - if err != nil { - startErr = err - return - } - p.kcpMu.Lock() - p.kcp = rt - p.kcpMu.Unlock() - }) - - return startErr + return nil } // epochHeader returns the 5-byte VP8-frame header used to tag every KCP @@ -462,23 +448,67 @@ func (s *vp8FrameState) processRTPPacket(pkt *rtp.Packet) []byte { func (p *streamTransport) readVP8Track(track *webrtc.TrackRemote) { var state vp8FrameState buf := make([]byte, rtpBufSize) + var rtpCount, frameCount uint64 + var unmarshalErr, depackErr uint64 for { n, _, err := track.Read(buf) if err != nil { + logger.Infof("vp8channel: readVP8Track exit err=%v rtpPkts=%d frames=%d unmarshalErr=%d depackErr=%d", + err, rtpCount, frameCount, unmarshalErr, depackErr) return } pkt := &rtp.Packet{} if pkt.Unmarshal(buf[:n]) != nil { + unmarshalErr++ continue } + rtpCount++ + if rtpCount == 1 || rtpCount == 10 || rtpCount == 100 { + logger.Infof("vp8channel: rtp#%d seq=%d marker=%v payloadLen=%d payloadFirst=%x", + rtpCount, pkt.SequenceNumber, pkt.Marker, len(pkt.Payload), + func() []byte { + if len(pkt.Payload) > 8 { + return pkt.Payload[:8] + } + return pkt.Payload + }()) + } + + var vp8Pkt codecs.VP8Packet + vp8Payload, derr := vp8Pkt.Unmarshal(pkt.Payload) + if derr != nil { + depackErr++ + if depackErr <= 3 { + logger.Infof("vp8channel: VP8 depack error #%d: %v payloadFirst=%x", depackErr, derr, + func() []byte { + if len(pkt.Payload) > 8 { + return pkt.Payload[:8] + } + return pkt.Payload + }()) + } + } else if rtpCount <= 3 { + logger.Infof("vp8channel: vp8pkt S=%d marker=%v payloadLen=%d", vp8Pkt.S, pkt.Marker, len(vp8Payload)) + } + frame := state.processRTPPacket(pkt) if frame == nil { continue } + frameCount++ + if frameCount <= 10 { + preview := frame + if len(preview) > 16 { + preview = preview[:16] + } + logger.Infof("vp8channel: frame #%d rtpPkts=%d len=%d first=%x magic=%v", + frameCount, rtpCount, len(frame), preview, frame[0] == kcpFrameMagic) + } + p.handleIncomingFrame(frame) } } @@ -487,7 +517,9 @@ func (p *streamTransport) readVP8Track(track *webrtc.TrackRemote) { // payload to the local session or triggers a reset when the peer's epoch // changes (peer process restart). func (p *streamTransport) handleIncomingFrame(frame []byte) { - if binary.BigEndian.Uint32(frame[tokenOff:epochOff]) != p.bindingToken { + frameToken := binary.BigEndian.Uint32(frame[tokenOff:epochOff]) + if frameToken != p.bindingToken { + logger.Debugf("vp8channel: frame token mismatch got=0x%08x want=0x%08x (foreign client or noise)", frameToken, p.bindingToken) return } peerEpoch := binary.BigEndian.Uint32(frame[epochOff:epochHdrLen]) @@ -500,11 +532,24 @@ func (p *streamTransport) handleIncomingFrame(frame []byte) { // treat them as peer traffic, epoch tracking toggles between "self" and // "peer" and both sides loop forever resetting smux/KCP. if peerEpoch == p.localEpoch { + logger.Debugf("vp8channel: self-echo detected epoch=0x%08x (SFU reflects our own track)", peerEpoch) return } if !p.hadPeer.Swap(true) { p.peerEpoch.Store(peerEpoch) + logger.Infof("vp8channel: peer first seen epoch=0x%08x token=0x%08x", peerEpoch, binary.BigEndian.Uint32(frame[tokenOff:epochOff])) + p.kcpOnce.Do(func() { + rt, err := startKCP(p.outbound, p.onData, p.epochHeader()) + if err != nil { + logger.Infof("vp8channel: startKCP failed: %v", err) + return + } + p.kcpMu.Lock() + p.kcp = rt + p.kcpMu.Unlock() + logger.Infof("vp8channel: KCP started localEpoch=0x%08x", p.localEpoch) + }) } else if prev := p.peerEpoch.Load(); prev != peerEpoch { // Peer restarted its KCP session. Reset ours so the conv state // machines re-converge. CAS guards against double-reset when diff --git a/internal/transport/vp8channel/transport_unit_test.go b/internal/transport/vp8channel/transport_unit_test.go index b7789c9..e34e856 100644 --- a/internal/transport/vp8channel/transport_unit_test.go +++ b/internal/transport/vp8channel/transport_unit_test.go @@ -100,8 +100,8 @@ func TestNewConnectSendCallbacksFeaturesAndClose(t *testing.T) { if err := tr.Connect(context.Background()); err != nil { t.Fatalf("Connect() error = %v", err) } - if tr.kcp == nil || !tr.writerUp.Load() { - t.Fatal("Connect() did not initialize kcp/writer") + if tr.kcp != nil || !tr.writerUp.Load() { + t.Fatal("Connect() should not initialize kcp before peer arrives") } tr.SetReconnectCallback(func() {}) tr.SetShouldReconnect(func() bool { return true }) @@ -110,6 +110,18 @@ func TestNewConnectSendCallbacksFeaturesAndClose(t *testing.T) { if stream.reconnect == nil || stream.should == nil || stream.ended == nil || !stream.watched { t.Fatal("callbacks/watch were not forwarded") } + + peerEpoch := uint32(0x200) + firstFrame := make([]byte, epochHdrLen+4) + firstFrame[0] = kcpFrameMagic + binary.BigEndian.PutUint32(firstFrame[tokenOff:epochOff], tr.bindingToken) + binary.BigEndian.PutUint32(firstFrame[epochOff:epochHdrLen], peerEpoch) + copy(firstFrame[epochHdrLen:], []byte("data")) + tr.handleIncomingFrame(firstFrame) + if tr.kcp == nil { + t.Fatal("kcp not initialized after first peer frame") + } + if !tr.CanSend() { t.Fatal("CanSend() = false, want true") }