diff --git a/internal/transport/vp8channel/kcp.go b/internal/transport/vp8channel/kcp.go index 1fa6592..205e8cf 100644 --- a/internal/transport/vp8channel/kcp.go +++ b/internal/transport/vp8channel/kcp.go @@ -79,6 +79,7 @@ func startKCP(out chan<- []byte, onData func([]byte), epochHdr [epochHdrLen]byte sess.SetNoDelay(1, 5, 2, 1) sess.SetWindowSize(kcpSndWnd, kcpRcvWnd) sess.SetMtu(kcpMTU) + sess.SetStreamMode(true) sess.SetACKNoDelay(true) sess.SetWriteDelay(false) diff --git a/internal/transport/vp8channel/transport.go b/internal/transport/vp8channel/transport.go index e549895..3bb9d64 100644 --- a/internal/transport/vp8channel/transport.go +++ b/internal/transport/vp8channel/transport.go @@ -32,8 +32,8 @@ const ( defaultMaxPayloadSize = 60 * 1024 defaultConnectTimeout = 60 * time.Second rtpBufSize = 65536 - outboundQueueSize = 1024 - inboundQueueSize = 1024 + outboundQueueSize = 8192 + inboundQueueSize = 8192 canSendHighWatermark = 90 // percent keepaliveIdlePeriod = 100 * time.Millisecond ) @@ -68,6 +68,8 @@ const ( epochHdrLen = 32 ) +var kcpBatchMagic = [4]byte{'O', 'L', 'K', 'B'} //nolint:gochecknoglobals // wire marker + // videoSession is the subset of engine.Session + engine.VideoTrackCapable // the vp8channel transport relies on. type videoSession interface { @@ -399,12 +401,10 @@ func (p *streamTransport) Features() transport.Features { func (p *streamTransport) writerLoop() { defer close(p.writerDone) - sampleInterval := p.sampleInterval() - - ticker := time.NewTicker(sampleInterval) + ticker := time.NewTicker(p.frameInterval) defer ticker.Stop() - keepaliveEvery := max(int(keepaliveIdlePeriod/sampleInterval), 1) + keepaliveEvery := max(int(keepaliveIdlePeriod/p.frameInterval), 1) idleTicks := 0 for { @@ -415,7 +415,7 @@ func (p *streamTransport) writerLoop() { var sample []byte select { case frame := <-p.outbound: - sample = frame + sample = p.batchSample(frame) idleTicks = 0 default: idleTicks++ @@ -429,17 +429,48 @@ func (p *streamTransport) writerLoop() { _ = p.track.WriteSample(media.Sample{ Data: sample, - Duration: sampleInterval, + Duration: p.frameInterval, }) } } } -func (p *streamTransport) sampleInterval() time.Duration { - if p.batchSize > 1 { - return p.frameInterval / time.Duration(p.batchSize) +func (p *streamTransport) batchSample(first []byte) []byte { + if len(first) <= epochHdrLen || p.batchSize <= 1 { + return first } - return p.frameInterval + + sample := make([]byte, 0, defaultMaxPayloadSize) + sample = append(sample, first[:epochHdrLen]...) + sample = append(sample, kcpBatchMagic[:]...) + sample = appendBatchPacket(sample, first[epochHdrLen:]) + + for packets := 1; packets < p.batchSize; packets++ { + select { + case frame := <-p.outbound: + if len(frame) <= epochHdrLen { + continue + } + payload := frame[epochHdrLen:] + if len(sample)+2+len(payload) > defaultMaxPayloadSize { + return sample + } + sample = appendBatchPacket(sample, payload) + default: + return sample + } + } + return sample +} + +func appendBatchPacket(dst, packet []byte) []byte { + if len(packet) > 0xffff { + return dst + } + var lenBuf [2]byte + binary.BigEndian.PutUint16(lenBuf[:], uint16(len(packet))) //nolint:gosec // bounded above + dst = append(dst, lenBuf[:]...) + return append(dst, packet...) } func (p *streamTransport) resetKCP() { @@ -618,7 +649,36 @@ func (p *streamTransport) handleIncomingFrame(frame []byte) { rt := p.kcp p.kcpMu.RUnlock() if rt != nil { - rt.deliver(kcpPayload) + deliverKCPPayload(rt, kcpPayload) + } +} + +func deliverKCPPayload(rt *kcpRuntime, payload []byte) { + if rt == nil || len(payload) == 0 { + return + } + splitKCPPayload(payload, rt.deliver) +} + +func splitKCPPayload(payload []byte, deliver func([]byte)) { + if len(payload) < len(kcpBatchMagic) || + string(payload[:len(kcpBatchMagic)]) != string(kcpBatchMagic[:]) { + deliver(payload) + return + } + + rest := payload[len(kcpBatchMagic):] + for len(rest) > 0 { + if len(rest) < 2 { + return + } + size := int(binary.BigEndian.Uint16(rest[:2])) + rest = rest[2:] + if size == 0 || len(rest) < size { + return + } + deliver(rest[:size]) + rest = rest[size:] } } diff --git a/internal/transport/vp8channel/transport_test.go b/internal/transport/vp8channel/transport_test.go index cee1cd8..0d81156 100644 --- a/internal/transport/vp8channel/transport_test.go +++ b/internal/transport/vp8channel/transport_test.go @@ -111,6 +111,56 @@ func TestVP8KeepaliveDoesNotLookLikeKCP(t *testing.T) { } } +func TestBatchSampleCarriesMultipleKCPPackets(t *testing.T) { + hdr := testEpochHdr(1) + packet := func(payload string) []byte { + frame := make([]byte, epochHdrLen+len(payload)) + copy(frame, hdr[:]) + copy(frame[epochHdrLen:], payload) + return frame + } + + tr := &streamTransport{ + outbound: make(chan []byte, 4), + batchSize: 3, + } + tr.outbound <- packet("two") + tr.outbound <- packet("three") + tr.outbound <- packet("four") + + sample := tr.batchSample(packet("one")) + if !bytes.Equal(sample[:epochHdrLen], hdr[:]) { + t.Fatalf("sample epoch header = %x, want %x", sample[:epochHdrLen], hdr[:]) + } + + var got []string + splitKCPPayload(sample[epochHdrLen:], func(payload []byte) { + got = append(got, string(payload)) + }) + want := []string{"one", "two", "three"} + if len(got) != len(want) { + t.Fatalf("split payload count = %d, want %d (%v)", len(got), len(want), got) + } + for i := range want { + if got[i] != want[i] { + t.Fatalf("payload[%d] = %q, want %q", i, got[i], want[i]) + } + } + if left := len(tr.outbound); left != 1 { + t.Fatalf("outbound left = %d, want 1", left) + } +} + +func TestSplitKCPPayloadAcceptsLegacySinglePacket(t *testing.T) { + var got [][]byte + splitKCPPayload([]byte("single"), func(payload []byte) { + got = append(got, append([]byte(nil), payload...)) + }) + if len(got) != 1 || string(got[0]) != "single" { + t.Fatalf("split legacy payload = %q", got) + } +} + func testEpochHdr(epoch uint32) [epochHdrLen]byte { var hdr [epochHdrLen]byte copy(hdr[:], vp8Keepalive) diff --git a/internal/transport/vp8channel/transport_unit_test.go b/internal/transport/vp8channel/transport_unit_test.go index 98ce099..d68582e 100644 --- a/internal/transport/vp8channel/transport_unit_test.go +++ b/internal/transport/vp8channel/transport_unit_test.go @@ -17,19 +17,18 @@ import ( var errVP8UnitBoom = errors.New("boom") -func TestSampleIntervalWithBatch(t *testing.T) { +func TestWriterCadenceStaysAtFrameInterval(t *testing.T) { tr := &streamTransport{ frameInterval: time.Second / 60, batchSize: 64, } - want := time.Second / 60 / 64 - if got := tr.sampleInterval(); got != want { - t.Fatalf("sampleInterval() = %v, want %v", got, want) + if got := tr.frameInterval; got != time.Second/60 { + t.Fatalf("frameInterval = %v, want %v", got, time.Second/60) } tr.batchSize = 1 - if got := tr.sampleInterval(); got != tr.frameInterval { - t.Fatalf("sampleInterval(batch=1) = %v, want %v", got, tr.frameInterval) + if got := tr.frameInterval; got != time.Second/60 { + t.Fatalf("frameInterval after batch change = %v, want %v", got, time.Second/60) } }