feat(vp8channel): batch multiple KCP packets per RTP sample

This commit is contained in:
zarazaex69
2026-05-20 00:12:01 +03:00
parent 74bb402289
commit bfa6d73ad1
4 changed files with 129 additions and 19 deletions

View File

@@ -79,6 +79,7 @@ func startKCP(out chan<- []byte, onData func([]byte), epochHdr [epochHdrLen]byte
sess.SetNoDelay(1, 5, 2, 1)
sess.SetWindowSize(kcpSndWnd, kcpRcvWnd)
sess.SetMtu(kcpMTU)
sess.SetStreamMode(true)
sess.SetACKNoDelay(true)
sess.SetWriteDelay(false)

View File

@@ -32,8 +32,8 @@ const (
defaultMaxPayloadSize = 60 * 1024
defaultConnectTimeout = 60 * time.Second
rtpBufSize = 65536
outboundQueueSize = 1024
inboundQueueSize = 1024
outboundQueueSize = 8192
inboundQueueSize = 8192
canSendHighWatermark = 90 // percent
keepaliveIdlePeriod = 100 * time.Millisecond
)
@@ -68,6 +68,8 @@ const (
epochHdrLen = 32
)
var kcpBatchMagic = [4]byte{'O', 'L', 'K', 'B'} //nolint:gochecknoglobals // wire marker
// videoSession is the subset of engine.Session + engine.VideoTrackCapable
// the vp8channel transport relies on.
type videoSession interface {
@@ -399,12 +401,10 @@ func (p *streamTransport) Features() transport.Features {
func (p *streamTransport) writerLoop() {
defer close(p.writerDone)
sampleInterval := p.sampleInterval()
ticker := time.NewTicker(sampleInterval)
ticker := time.NewTicker(p.frameInterval)
defer ticker.Stop()
keepaliveEvery := max(int(keepaliveIdlePeriod/sampleInterval), 1)
keepaliveEvery := max(int(keepaliveIdlePeriod/p.frameInterval), 1)
idleTicks := 0
for {
@@ -415,7 +415,7 @@ func (p *streamTransport) writerLoop() {
var sample []byte
select {
case frame := <-p.outbound:
sample = frame
sample = p.batchSample(frame)
idleTicks = 0
default:
idleTicks++
@@ -429,17 +429,48 @@ func (p *streamTransport) writerLoop() {
_ = p.track.WriteSample(media.Sample{
Data: sample,
Duration: sampleInterval,
Duration: p.frameInterval,
})
}
}
}
func (p *streamTransport) sampleInterval() time.Duration {
if p.batchSize > 1 {
return p.frameInterval / time.Duration(p.batchSize)
func (p *streamTransport) batchSample(first []byte) []byte {
if len(first) <= epochHdrLen || p.batchSize <= 1 {
return first
}
return p.frameInterval
sample := make([]byte, 0, defaultMaxPayloadSize)
sample = append(sample, first[:epochHdrLen]...)
sample = append(sample, kcpBatchMagic[:]...)
sample = appendBatchPacket(sample, first[epochHdrLen:])
for packets := 1; packets < p.batchSize; packets++ {
select {
case frame := <-p.outbound:
if len(frame) <= epochHdrLen {
continue
}
payload := frame[epochHdrLen:]
if len(sample)+2+len(payload) > defaultMaxPayloadSize {
return sample
}
sample = appendBatchPacket(sample, payload)
default:
return sample
}
}
return sample
}
func appendBatchPacket(dst, packet []byte) []byte {
if len(packet) > 0xffff {
return dst
}
var lenBuf [2]byte
binary.BigEndian.PutUint16(lenBuf[:], uint16(len(packet))) //nolint:gosec // bounded above
dst = append(dst, lenBuf[:]...)
return append(dst, packet...)
}
func (p *streamTransport) resetKCP() {
@@ -618,7 +649,36 @@ func (p *streamTransport) handleIncomingFrame(frame []byte) {
rt := p.kcp
p.kcpMu.RUnlock()
if rt != nil {
rt.deliver(kcpPayload)
deliverKCPPayload(rt, kcpPayload)
}
}
func deliverKCPPayload(rt *kcpRuntime, payload []byte) {
if rt == nil || len(payload) == 0 {
return
}
splitKCPPayload(payload, rt.deliver)
}
func splitKCPPayload(payload []byte, deliver func([]byte)) {
if len(payload) < len(kcpBatchMagic) ||
string(payload[:len(kcpBatchMagic)]) != string(kcpBatchMagic[:]) {
deliver(payload)
return
}
rest := payload[len(kcpBatchMagic):]
for len(rest) > 0 {
if len(rest) < 2 {
return
}
size := int(binary.BigEndian.Uint16(rest[:2]))
rest = rest[2:]
if size == 0 || len(rest) < size {
return
}
deliver(rest[:size])
rest = rest[size:]
}
}

View File

@@ -111,6 +111,56 @@ func TestVP8KeepaliveDoesNotLookLikeKCP(t *testing.T) {
}
}
func TestBatchSampleCarriesMultipleKCPPackets(t *testing.T) {
hdr := testEpochHdr(1)
packet := func(payload string) []byte {
frame := make([]byte, epochHdrLen+len(payload))
copy(frame, hdr[:])
copy(frame[epochHdrLen:], payload)
return frame
}
tr := &streamTransport{
outbound: make(chan []byte, 4),
batchSize: 3,
}
tr.outbound <- packet("two")
tr.outbound <- packet("three")
tr.outbound <- packet("four")
sample := tr.batchSample(packet("one"))
if !bytes.Equal(sample[:epochHdrLen], hdr[:]) {
t.Fatalf("sample epoch header = %x, want %x", sample[:epochHdrLen], hdr[:])
}
var got []string
splitKCPPayload(sample[epochHdrLen:], func(payload []byte) {
got = append(got, string(payload))
})
want := []string{"one", "two", "three"}
if len(got) != len(want) {
t.Fatalf("split payload count = %d, want %d (%v)", len(got), len(want), got)
}
for i := range want {
if got[i] != want[i] {
t.Fatalf("payload[%d] = %q, want %q", i, got[i], want[i])
}
}
if left := len(tr.outbound); left != 1 {
t.Fatalf("outbound left = %d, want 1", left)
}
}
func TestSplitKCPPayloadAcceptsLegacySinglePacket(t *testing.T) {
var got [][]byte
splitKCPPayload([]byte("single"), func(payload []byte) {
got = append(got, append([]byte(nil), payload...))
})
if len(got) != 1 || string(got[0]) != "single" {
t.Fatalf("split legacy payload = %q", got)
}
}
func testEpochHdr(epoch uint32) [epochHdrLen]byte {
var hdr [epochHdrLen]byte
copy(hdr[:], vp8Keepalive)

View File

@@ -17,19 +17,18 @@ import (
var errVP8UnitBoom = errors.New("boom")
func TestSampleIntervalWithBatch(t *testing.T) {
func TestWriterCadenceStaysAtFrameInterval(t *testing.T) {
tr := &streamTransport{
frameInterval: time.Second / 60,
batchSize: 64,
}
want := time.Second / 60 / 64
if got := tr.sampleInterval(); got != want {
t.Fatalf("sampleInterval() = %v, want %v", got, want)
if got := tr.frameInterval; got != time.Second/60 {
t.Fatalf("frameInterval = %v, want %v", got, time.Second/60)
}
tr.batchSize = 1
if got := tr.sampleInterval(); got != tr.frameInterval {
t.Fatalf("sampleInterval(batch=1) = %v, want %v", got, tr.frameInterval)
if got := tr.frameInterval; got != time.Second/60 {
t.Fatalf("frameInterval after batch change = %v, want %v", got, time.Second/60)
}
}