From 52aea2d79d2f4234fea9332afce779beb9a6ebfc Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Fri, 29 May 2026 19:16:59 +0300 Subject: [PATCH] fix(vp8channel): add byte-rate pacer to prevent policer collapse --- internal/control/control.go | 9 +++-- internal/transport/vp8channel/kcp.go | 31 +++++++++------- internal/transport/vp8channel/options.go | 8 +++++ internal/transport/vp8channel/transport.go | 36 ++++++++++++++----- .../transport/vp8channel/transport_test.go | 2 +- 5 files changed, 62 insertions(+), 24 deletions(-) diff --git a/internal/control/control.go b/internal/control/control.go index 450f340..ce2502d 100644 --- a/internal/control/control.go +++ b/internal/control/control.go @@ -29,11 +29,14 @@ const ( MaxMessageSize = 16 * 1024 // DefaultInterval is the default interval between ping probes. DefaultInterval = 10 * time.Second - // DefaultTimeout is the default time to wait for a pong. - DefaultTimeout = 5 * time.Second + // DefaultTimeout is the default time to wait for a pong. Generous because + // the ping shares the bulk smux/KCP stream: under a heavy transfer the + // ping byte can be head-of-line blocked behind queued data for several + // seconds, which is liveness-OK, not a dead link. + DefaultTimeout = 15 * time.Second // DefaultFailures is the default number of consecutive missed pongs before // the stream is marked unhealthy. - DefaultFailures = 3 + DefaultFailures = 4 ) // MsgType labels a control message. diff --git a/internal/transport/vp8channel/kcp.go b/internal/transport/vp8channel/kcp.go index 260fb3e..d984d11 100644 --- a/internal/transport/vp8channel/kcp.go +++ b/internal/transport/vp8channel/kcp.go @@ -27,11 +27,17 @@ const ( // clamped. Stay below that with headroom for KCP overhead (24 bytes). kcpMTU = 1400 - // Receive/send window in segments. Large window allows in-flight bursts - // without stalling - important when one VP8 frame may carry many KCP - // segments and ACKs trickle back at frame cadence. - kcpSndWnd = 4096 - kcpRcvWnd = 4096 + // Send/receive window in segments, sized to the bandwidth-delay product + // of the policed video path (~1.2 MB/s wire cap, sub-second RTT), NOT to + // "as much as possible". A large send window let the upper layer dump + // megabytes into KCP instantly; with the wire paced to ~1.2 MB/s those + // segments then sat queued for SECONDS, so KCP's RTO fired and triggered + // a retransmit storm while control-plane pongs starved behind the same + // queue (-> missed pongs -> reconnect). A small send window bounds + // in-flight data to ~BDP, keeping queuing latency low. The receive + // window stays generous so the peer is never the bottleneck. + kcpSndWnd = 768 + kcpRcvWnd = 1024 // Length prefix for our message framing on top of KCP stream mode. // We use stream mode because UDPSession.Write fragments messages > MSS @@ -69,13 +75,14 @@ func startKCP(out chan<- []byte, onData func([]byte), epochHdr [epochHdrLen]byte return nil, fmt.Errorf("kcp new conn: %w", err) } - // Aggressive ARQ tuning: nodelay=1, interval=5ms, fast resend=2, no - // congestion control. The 5ms tick (vs the kcptun-default 10ms) halves - // the worst-case scheduling latency in each direction, which matters a - // lot for interactive workloads (SOCKS5 + HTTP request needs ~3 RTTs - // before the first byte of the response shows up). Below 5ms the - // CPU cost of the KCP update loop starts climbing without much - // additional latency benefit. + // nodelay=1, interval=5ms, fast resend=2, congestion control OFF (nc=1). + // KCP does NOT regulate the send rate here — the writerLoop byte pacer + // does, fed at a fixed rate just under the carrier's policer knee. KCP's + // own loss-based congestion control is the wrong controller for a hard + // policer: with nc=0 the unavoidable ~4% drops collapsed cwnd and starved + // the wire to ~45 KiB/s. With nc=1 KCP just keeps the BDP-sized window + // full and retransmits the few losses; the pacer caps the rate so we + // never overdrive the policer into its collapse zone. sess.SetNoDelay(1, 5, 2, 1) sess.SetWindowSize(kcpSndWnd, kcpRcvWnd) sess.SetMtu(kcpMTU) diff --git a/internal/transport/vp8channel/options.go b/internal/transport/vp8channel/options.go index 7e12733..d005e6f 100644 --- a/internal/transport/vp8channel/options.go +++ b/internal/transport/vp8channel/options.go @@ -9,12 +9,20 @@ import ( const ( defaultFPS = 60 defaultBatchSize = 64 + // defaultMaxBytesPerSec paces the wire byte-rate just under the Telemost + // SFU's measured per-slot policer knee (~1.4 MiB/s). Above it the SFU + // drops bursts wholesale, collapsing goodput and starving keepalives; + // staying under keeps loss near zero. See TestRealRawVP8Throughput. + defaultMaxBytesPerSec = 1_200_000 ) // Options tunes the vp8channel transport. Zero values fall back to documented defaults. type Options struct { FPS int BatchSize int + // MaxBytesPerSec caps the wire byte-rate fed to the video track. Zero + // falls back to defaultMaxBytesPerSec. + MaxBytesPerSec int } // TransportOptions marks Options as belonging to the transport options family. diff --git a/internal/transport/vp8channel/transport.go b/internal/transport/vp8channel/transport.go index 9736858..92ad2d3 100644 --- a/internal/transport/vp8channel/transport.go +++ b/internal/transport/vp8channel/transport.go @@ -32,11 +32,15 @@ import ( const ( defaultMaxPayloadSize = 60 * 1024 defaultConnectTimeout = 60 * time.Second - rtpBufSize = 65536 - outboundQueueSize = 8192 - inboundQueueSize = 8192 - canSendHighWatermark = 90 // percent - keepaliveIdlePeriod = 100 * time.Millisecond + rtpBufSize = 65536 + // outboundQueueSize bounds KCP packets waiting for the paced writer. Sized + // to a couple of send windows so KCP's flush never blocks (a blocked + // WriteTo would stall KCP's update loop and delay ACKs); the paced writer + // keeps it drained so this depth is headroom, not standing latency. + outboundQueueSize = 2048 + inboundQueueSize = 8192 + canSendHighWatermark = 90 // percent + keepaliveIdlePeriod = 100 * time.Millisecond ) var ( @@ -100,6 +104,7 @@ type streamTransport struct { kcpOnce sync.Once frameInterval time.Duration batchSize int + perTickBytes int // localEpoch is stamped into every outgoing VP8 frame. Explicit // upper-layer resets rotate it so the peer can reset its KCP state too. @@ -189,6 +194,17 @@ func newStreamTransport( if batchSize <= 0 { batchSize = defaultBatchSize } + byteRate := opts.MaxBytesPerSec + if byteRate <= 0 { + byteRate = defaultMaxBytesPerSec + } + // Bytes we may emit per frame tick to hold the wire under byteRate. The + // ticker already paces at fps, so a per-tick cap bounds the rate without + // any token bookkeeping. Floor at one epoch header so keepalives fit. + perTickBytes := byteRate / fps + if perTickBytes < epochHdrLen { + perTickBytes = epochHdrLen + } tr := &streamTransport{ stream: stream, @@ -200,6 +216,7 @@ func newStreamTransport( writerDone: make(chan struct{}), frameInterval: time.Second / time.Duration(fps), batchSize: batchSize, + perTickBytes: perTickBytes, bindingToken: bindingToken(cfg.RoomURL), localEpoch: randomEpoch(), peers: make(map[uint32]*kcpRuntime), @@ -492,7 +509,7 @@ func (p *streamTransport) writerLoop() { var sample []byte select { case frame := <-p.outbound: - sample = p.batchSample(frame) + sample = p.batchSample(frame, p.perTickBytes) idleTicks = 0 default: idleTicks++ @@ -512,7 +529,10 @@ func (p *streamTransport) writerLoop() { } } -func (p *streamTransport) batchSample(first []byte) []byte { +func (p *streamTransport) batchSample(first []byte, maxBytes int) []byte { + if maxBytes <= 0 || maxBytes > defaultMaxPayloadSize { + maxBytes = defaultMaxPayloadSize + } if len(first) <= epochHdrLen || p.batchSize <= 1 { return first } @@ -529,7 +549,7 @@ func (p *streamTransport) batchSample(first []byte) []byte { continue } payload := frame[epochHdrLen:] - if len(sample)+2+len(payload) > defaultMaxPayloadSize { + if len(sample)+2+len(payload) > maxBytes { return sample } sample = appendBatchPacket(sample, payload) diff --git a/internal/transport/vp8channel/transport_test.go b/internal/transport/vp8channel/transport_test.go index 1bf7854..9326cd5 100644 --- a/internal/transport/vp8channel/transport_test.go +++ b/internal/transport/vp8channel/transport_test.go @@ -128,7 +128,7 @@ func TestBatchSampleCarriesMultipleKCPPackets(t *testing.T) { tr.outbound <- packet("three") tr.outbound <- packet("four") - sample := tr.batchSample(packet("one")) + sample := tr.batchSample(packet("one"), defaultMaxPayloadSize) if !bytes.Equal(sample[:epochHdrLen], hdr[:]) { t.Fatalf("sample epoch header = %x, want %x", sample[:epochHdrLen], hdr[:]) }