upd: trandport go fix maybe

This commit is contained in:
zarazaex69
2026-05-07 20:43:32 +03:00
parent f8d8bf326e
commit 7e810acd34
2 changed files with 78 additions and 21 deletions

View File

@@ -1,3 +1,5 @@
package vp8channel
import (
@@ -12,6 +14,7 @@ import (
"time"
"github.com/openlibrecommunity/olcrtc/internal/carrier"
"github.com/openlibrecommunity/olcrtc/internal/logger"
"github.com/openlibrecommunity/olcrtc/internal/transport"
"github.com/pion/rtp"
"github.com/pion/rtp/codecs"
@@ -166,24 +169,7 @@ func (p *streamTransport) Connect(ctx context.Context) error {
go p.writerLoop()
})
// Start KCP immediately. Don't wait for the peer's video track:
// the server may legitimately come up before any client joins the
// room, and KCP itself does not need a handshake. Once the peer
// shows up, handleRemoteTrack starts pumping their RTP into our
// session and the epoch-change detector handles peer restarts.
var startErr error
p.kcpOnce.Do(func() {
rt, err := startKCP(p.outbound, p.onData, p.epochHeader())
if err != nil {
startErr = err
return
}
p.kcpMu.Lock()
p.kcp = rt
p.kcpMu.Unlock()
})
return startErr
return nil
}
// epochHeader returns the 5-byte VP8-frame header used to tag every KCP
@@ -462,23 +448,67 @@ func (s *vp8FrameState) processRTPPacket(pkt *rtp.Packet) []byte {
func (p *streamTransport) readVP8Track(track *webrtc.TrackRemote) {
var state vp8FrameState
buf := make([]byte, rtpBufSize)
var rtpCount, frameCount uint64
var unmarshalErr, depackErr uint64
for {
n, _, err := track.Read(buf)
if err != nil {
logger.Infof("vp8channel: readVP8Track exit err=%v rtpPkts=%d frames=%d unmarshalErr=%d depackErr=%d",
err, rtpCount, frameCount, unmarshalErr, depackErr)
return
}
pkt := &rtp.Packet{}
if pkt.Unmarshal(buf[:n]) != nil {
unmarshalErr++
continue
}
rtpCount++
if rtpCount == 1 || rtpCount == 10 || rtpCount == 100 {
logger.Infof("vp8channel: rtp#%d seq=%d marker=%v payloadLen=%d payloadFirst=%x",
rtpCount, pkt.SequenceNumber, pkt.Marker, len(pkt.Payload),
func() []byte {
if len(pkt.Payload) > 8 {
return pkt.Payload[:8]
}
return pkt.Payload
}())
}
var vp8Pkt codecs.VP8Packet
vp8Payload, derr := vp8Pkt.Unmarshal(pkt.Payload)
if derr != nil {
depackErr++
if depackErr <= 3 {
logger.Infof("vp8channel: VP8 depack error #%d: %v payloadFirst=%x", depackErr, derr,
func() []byte {
if len(pkt.Payload) > 8 {
return pkt.Payload[:8]
}
return pkt.Payload
}())
}
} else if rtpCount <= 3 {
logger.Infof("vp8channel: vp8pkt S=%d marker=%v payloadLen=%d", vp8Pkt.S, pkt.Marker, len(vp8Payload))
}
frame := state.processRTPPacket(pkt)
if frame == nil {
continue
}
frameCount++
if frameCount <= 10 {
preview := frame
if len(preview) > 16 {
preview = preview[:16]
}
logger.Infof("vp8channel: frame #%d rtpPkts=%d len=%d first=%x magic=%v",
frameCount, rtpCount, len(frame), preview, frame[0] == kcpFrameMagic)
}
p.handleIncomingFrame(frame)
}
}
@@ -487,7 +517,9 @@ func (p *streamTransport) readVP8Track(track *webrtc.TrackRemote) {
// payload to the local session or triggers a reset when the peer's epoch
// changes (peer process restart).
func (p *streamTransport) handleIncomingFrame(frame []byte) {
if binary.BigEndian.Uint32(frame[tokenOff:epochOff]) != p.bindingToken {
frameToken := binary.BigEndian.Uint32(frame[tokenOff:epochOff])
if frameToken != p.bindingToken {
logger.Debugf("vp8channel: frame token mismatch got=0x%08x want=0x%08x (foreign client or noise)", frameToken, p.bindingToken)
return
}
peerEpoch := binary.BigEndian.Uint32(frame[epochOff:epochHdrLen])
@@ -500,11 +532,24 @@ func (p *streamTransport) handleIncomingFrame(frame []byte) {
// treat them as peer traffic, epoch tracking toggles between "self" and
// "peer" and both sides loop forever resetting smux/KCP.
if peerEpoch == p.localEpoch {
logger.Debugf("vp8channel: self-echo detected epoch=0x%08x (SFU reflects our own track)", peerEpoch)
return
}
if !p.hadPeer.Swap(true) {
p.peerEpoch.Store(peerEpoch)
logger.Infof("vp8channel: peer first seen epoch=0x%08x token=0x%08x", peerEpoch, binary.BigEndian.Uint32(frame[tokenOff:epochOff]))
p.kcpOnce.Do(func() {
rt, err := startKCP(p.outbound, p.onData, p.epochHeader())
if err != nil {
logger.Infof("vp8channel: startKCP failed: %v", err)
return
}
p.kcpMu.Lock()
p.kcp = rt
p.kcpMu.Unlock()
logger.Infof("vp8channel: KCP started localEpoch=0x%08x", p.localEpoch)
})
} else if prev := p.peerEpoch.Load(); prev != peerEpoch {
// Peer restarted its KCP session. Reset ours so the conv state
// machines re-converge. CAS guards against double-reset when

View File

@@ -100,8 +100,8 @@ func TestNewConnectSendCallbacksFeaturesAndClose(t *testing.T) {
if err := tr.Connect(context.Background()); err != nil {
t.Fatalf("Connect() error = %v", err)
}
if tr.kcp == nil || !tr.writerUp.Load() {
t.Fatal("Connect() did not initialize kcp/writer")
if tr.kcp != nil || !tr.writerUp.Load() {
t.Fatal("Connect() should not initialize kcp before peer arrives")
}
tr.SetReconnectCallback(func() {})
tr.SetShouldReconnect(func() bool { return true })
@@ -110,6 +110,18 @@ func TestNewConnectSendCallbacksFeaturesAndClose(t *testing.T) {
if stream.reconnect == nil || stream.should == nil || stream.ended == nil || !stream.watched {
t.Fatal("callbacks/watch were not forwarded")
}
peerEpoch := uint32(0x200)
firstFrame := make([]byte, epochHdrLen+4)
firstFrame[0] = kcpFrameMagic
binary.BigEndian.PutUint32(firstFrame[tokenOff:epochOff], tr.bindingToken)
binary.BigEndian.PutUint32(firstFrame[epochOff:epochHdrLen], peerEpoch)
copy(firstFrame[epochHdrLen:], []byte("data"))
tr.handleIncomingFrame(firstFrame)
if tr.kcp == nil {
t.Fatal("kcp not initialized after first peer frame")
}
if !tr.CanSend() {
t.Fatal("CanSend() = false, want true")
}