feat: add vp8-fps and vp8-batch settings for vp8channel throughput tuning

This commit is contained in:
zarazaex69
2026-04-22 17:17:08 +03:00
parent 01a3b3a9d6
commit 6379fa527e
9 changed files with 85 additions and 32 deletions

View File

@@ -37,6 +37,8 @@ type config struct {
videoBitrate string
videoHW string
videoQRSize int
vp8FPS int
vp8BatchSize int
}
func main() {
@@ -113,6 +115,8 @@ func parseFlags() config {
flag.StringVar(&cfg.videoBitrate, "video-bitrate", "", "Video bitrate (videochannel only)")
flag.StringVar(&cfg.videoHW, "video-hw", "", "Hardware acceleration (none, nvenc)")
flag.IntVar(&cfg.videoQRSize, "video-qr-size", 0, "Video QR code fragment size (videochannel only)")
flag.IntVar(&cfg.vp8FPS, "vp8-fps", 0, "VP8 frames per second (vp8channel only, default 25)")
flag.IntVar(&cfg.vp8BatchSize, "vp8-batch", 0, "VP8 frames per tick (vp8channel only, default 1)")
flag.Parse()
return cfg
@@ -166,6 +170,8 @@ func toSessionConfig(cfg config) session.Config {
VideoBitrate: cfg.videoBitrate,
VideoHW: cfg.videoHW,
VideoQRSize: cfg.videoQRSize,
VP8FPS: cfg.vp8FPS,
VP8BatchSize: cfg.vp8BatchSize,
}
}

View File

@@ -73,6 +73,8 @@ type Config struct {
VideoBitrate string
VideoHW string
VideoQRSize int
VP8FPS int
VP8BatchSize int
}
// RegisterDefaults registers built-in providers and transports.
@@ -206,6 +208,8 @@ func Run(ctx context.Context, cfg Config) error {
cfg.VideoBitrate,
cfg.VideoHW,
cfg.VideoQRSize,
cfg.VP8FPS,
cfg.VP8BatchSize,
)
case "cnc":
return client.Run(
@@ -225,6 +229,8 @@ func Run(ctx context.Context, cfg Config) error {
cfg.VideoBitrate,
cfg.VideoHW,
cfg.VideoQRSize,
cfg.VP8FPS,
cfg.VP8BatchSize,
)
default:
return ErrModeRequired

View File

@@ -61,8 +61,10 @@ func Run(
videoBitrate string,
videoHW string,
videoQRSize int,
vp8FPS int,
vp8BatchSize int,
) error {
return RunWithReady(ctx, linkName, transportName, carrierName, roomURL, keyHex, localAddr, dnsServer, socksUser, socksPass, nil, videoWidth, videoHeight, videoFPS, videoBitrate, videoHW, videoQRSize)
return RunWithReady(ctx, linkName, transportName, carrierName, roomURL, keyHex, localAddr, dnsServer, socksUser, socksPass, nil, videoWidth, videoHeight, videoFPS, videoBitrate, videoHW, videoQRSize, vp8FPS, vp8BatchSize)
}
// RunWithReady is like Run but accepts a callback that is called when the client is ready.
@@ -84,6 +86,8 @@ func RunWithReady(
videoBitrate string,
videoHW string,
videoQRSize int,
vp8FPS int,
vp8BatchSize int,
) error {
runCtx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -111,7 +115,7 @@ func RunWithReady(
const linkCount = 1
for i := range linkCount {
if err := c.addLink(runCtx, linkName, transportName, carrierName, roomURL, i, cancel, dnsServer, "", 0, videoWidth, videoHeight, videoFPS, videoBitrate, videoHW, videoQRSize); err != nil {
if err := c.addLink(runCtx, linkName, transportName, carrierName, roomURL, i, cancel, dnsServer, "", 0, videoWidth, videoHeight, videoFPS, videoBitrate, videoHW, videoQRSize, vp8FPS, vp8BatchSize); err != nil {
return fmt.Errorf("addLink failed: %w", err)
}
}
@@ -215,6 +219,8 @@ func (c *Client) addLink(
videoWidth, videoHeight, videoFPS int,
videoBitrate, videoHW string,
videoQRSize int,
vp8FPS int,
vp8BatchSize int,
) error {
ln, err := link.New(ctx, linkName, link.Config{
Transport: transportName,
@@ -231,6 +237,8 @@ func (c *Client) addLink(
VideoBitrate: videoBitrate,
VideoHW: videoHW,
VideoQRSize: videoQRSize,
VP8FPS: vp8FPS,
VP8BatchSize: vp8BatchSize,
})
if err != nil {
return fmt.Errorf("failed to create link: %w", err)

View File

@@ -29,6 +29,8 @@ func New(ctx context.Context, cfg link.Config) (link.Link, error) {
VideoBitrate: cfg.VideoBitrate,
VideoHW: cfg.VideoHW,
VideoQRSize: cfg.VideoQRSize,
VP8FPS: cfg.VP8FPS,
VP8BatchSize: cfg.VP8BatchSize,
})
if err != nil {
return nil, fmt.Errorf("create transport for direct link: %w", err)

View File

@@ -39,6 +39,8 @@ type Config struct {
VideoBitrate string
VideoHW string
VideoQRSize int
VP8FPS int
VP8BatchSize int
}
// Factory creates a link instance.

View File

@@ -80,6 +80,8 @@ func Run(
videoBitrate string,
videoHW string,
videoQRSize int,
vp8FPS int,
vp8BatchSize int,
) error {
runCtx, cancel := context.WithCancel(ctx)
defer cancel()
@@ -104,7 +106,7 @@ func Run(
const linkCount = 1
for i := range linkCount {
if err := s.addLink(runCtx, linkName, transportName, carrierName, roomURL, i, cancel, videoWidth, videoHeight, videoFPS, videoBitrate, videoHW, videoQRSize); err != nil {
if err := s.addLink(runCtx, linkName, transportName, carrierName, roomURL, i, cancel, videoWidth, videoHeight, videoFPS, videoBitrate, videoHW, videoQRSize, vp8FPS, vp8BatchSize); err != nil {
return fmt.Errorf("addLink failed: %w", err)
}
}
@@ -191,6 +193,8 @@ func (s *Server) addLink(
videoWidth, videoHeight, videoFPS int,
videoBitrate, videoHW string,
videoQRSize int,
vp8FPS int,
vp8BatchSize int,
) error {
ln, err := link.New(ctx, linkName, link.Config{
Transport: transportName,
@@ -207,6 +211,8 @@ func (s *Server) addLink(
VideoBitrate: videoBitrate,
VideoHW: videoHW,
VideoQRSize: videoQRSize,
VP8FPS: vp8FPS,
VP8BatchSize: vp8BatchSize,
})
if err != nil {
return fmt.Errorf("failed to create link: %w", err)

View File

@@ -47,6 +47,8 @@ type Config struct {
VideoBitrate string
VideoHW string
VideoQRSize int
VP8FPS int
VP8BatchSize int
}
// Factory creates a transport instance.

View File

@@ -19,7 +19,8 @@ import (
const (
defaultMaxPayloadSize = 60 * 1024
defaultFrameInterval = 40 * time.Millisecond
defaultFPS = 25
defaultBatchSize = 1
defaultConnectTimeout = 30 * time.Second
dataMarker = 0xFF
rtpBufSize = 65536
@@ -37,15 +38,17 @@ var vp8Keepalive = []byte{
}
type streamTransport struct {
stream carrier.VideoTrack
track *webrtc.TrackLocalStaticSample
onData func([]byte)
outbound chan []byte
closeCh chan struct{}
writerDone chan struct{}
closed atomic.Bool
writerUp atomic.Bool
startOnce sync.Once
stream carrier.VideoTrack
track *webrtc.TrackLocalStaticSample
onData func([]byte)
outbound chan []byte
closeCh chan struct{}
writerDone chan struct{}
closed atomic.Bool
writerUp atomic.Bool
startOnce sync.Once
frameInterval time.Duration
batchSize int
}
func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) {
@@ -83,13 +86,24 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error)
return nil, fmt.Errorf("create local video track: %w", err)
}
fps := cfg.VP8FPS
if fps <= 0 {
fps = defaultFPS
}
batchSize := cfg.VP8BatchSize
if batchSize <= 0 {
batchSize = defaultBatchSize
}
tr := &streamTransport{
stream: stream,
track: track,
onData: cfg.OnData,
outbound: make(chan []byte, 256),
closeCh: make(chan struct{}),
writerDone: make(chan struct{}),
stream: stream,
track: track,
onData: cfg.OnData,
outbound: make(chan []byte, 256),
closeCh: make(chan struct{}),
writerDone: make(chan struct{}),
frameInterval: time.Second / time.Duration(fps),
batchSize: batchSize,
}
if err := stream.AddTrack(track); err != nil {
@@ -174,7 +188,7 @@ func (p *streamTransport) Features() transport.Features {
func (p *streamTransport) writerLoop() {
defer close(p.writerDone)
ticker := time.NewTicker(defaultFrameInterval)
ticker := time.NewTicker(p.frameInterval)
defer ticker.Stop()
for {
@@ -182,19 +196,26 @@ func (p *streamTransport) writerLoop() {
case <-p.closeCh:
return
case <-ticker.C:
var sample []byte
sent := 0
for sent < p.batchSize {
var sample []byte
select {
case frame := <-p.outbound:
sample = frame
default:
sample = vp8Keepalive
}
select {
case frame := <-p.outbound:
sample = frame
default:
sample = vp8Keepalive
_ = p.track.WriteSample(media.Sample{
Data: sample,
Duration: p.frameInterval,
})
sent++
if sample[0] != dataMarker {
break
}
}
_ = p.track.WriteSample(media.Sample{
Data: sample,
Duration: defaultFrameInterval,
})
}
}
}

View File

@@ -123,7 +123,7 @@ func Start(roomID, keyHex string, socksPort int, socksUser, socksPass string) er
close(localReady)
})
},
0, 0, 0, "", "", 0,
0, 0, 0, "", "", 0, 0, 0,
)
mu.Lock()