mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-05-30 08:59:43 +00:00
refactor(videochannel): remove ffmpeg support and fix lint
This commit is contained in:
@@ -25,7 +25,6 @@ import (
|
||||
"github.com/openlibrecommunity/olcrtc/internal/logger"
|
||||
"github.com/openlibrecommunity/olcrtc/internal/names"
|
||||
"github.com/openlibrecommunity/olcrtc/internal/supervisor"
|
||||
"github.com/openlibrecommunity/olcrtc/internal/transport/videochannel"
|
||||
)
|
||||
|
||||
const modeGen = "gen"
|
||||
@@ -52,7 +51,6 @@ type loadedConfig struct {
|
||||
failover failoverConfig
|
||||
dataDir string
|
||||
debug bool
|
||||
ffmpegPath string
|
||||
}
|
||||
|
||||
type failoverConfig struct {
|
||||
@@ -115,7 +113,6 @@ func loadConfig(path string) (loadedConfig, error) {
|
||||
failover: failover,
|
||||
dataDir: f.Data,
|
||||
debug: f.Debug,
|
||||
ffmpegPath: f.FFmpeg,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -134,10 +131,6 @@ func parseFailoverConfig(f configpkg.Failover) (failoverConfig, error) {
|
||||
func runWithConfig(cfg loadedConfig) error {
|
||||
configureLogging(cfg.debug)
|
||||
|
||||
if cfg.ffmpegPath != "ffmpeg" && cfg.ffmpegPath != "" {
|
||||
videochannel.FFmpegPath = cfg.ffmpegPath
|
||||
}
|
||||
|
||||
scfg, err := session.ApplyAuthDefaults(cfg.scfg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("validate config: %w", err)
|
||||
|
||||
@@ -51,7 +51,6 @@ type File struct {
|
||||
Failover Failover `yaml:"failover"`
|
||||
Data string `yaml:"data"`
|
||||
Debug bool `yaml:"debug"`
|
||||
FFmpeg string `yaml:"ffmpeg"`
|
||||
}
|
||||
|
||||
// Profile is a failover entry that overrides top-level runtime fields.
|
||||
|
||||
75
internal/transport/videochannel/codec_spec.go
Normal file
75
internal/transport/videochannel/codec_spec.go
Normal file
@@ -0,0 +1,75 @@
|
||||
package videochannel
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"strings"
|
||||
|
||||
"github.com/pion/rtp"
|
||||
"github.com/pion/rtp/codecs"
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
// ErrUnexpectedFrameSize is returned when the raw frame size does not match expectations.
|
||||
var ErrUnexpectedFrameSize = errors.New("unexpected encoder frame size")
|
||||
|
||||
// codecSpec describes a video codec used by videochannel: its WebRTC
|
||||
// capability and the depacketizer for inbound RTP streams.
|
||||
type codecSpec struct {
|
||||
mimeType string
|
||||
capability webrtc.RTPCodecCapability
|
||||
depacketizer func() rtp.Depacketizer
|
||||
}
|
||||
|
||||
// codecSpecForCarrier returns the codec used to negotiate outbound video for
|
||||
// the given carrier. Currently every carrier uses VP8.
|
||||
func codecSpecForCarrier(_ string) codecSpec {
|
||||
return vp8CodecSpec()
|
||||
}
|
||||
|
||||
// codecSpecForMime returns the codec spec matching a WebRTC MIME type
|
||||
// reported by the remote peer.
|
||||
func codecSpecForMime(mimeType string) (codecSpec, bool) {
|
||||
switch strings.ToLower(mimeType) {
|
||||
case strings.ToLower(webrtc.MimeTypeH264):
|
||||
return h264CodecSpec(), true
|
||||
case strings.ToLower(webrtc.MimeTypeVP9):
|
||||
return vp9CodecSpec(), true
|
||||
case strings.ToLower(webrtc.MimeTypeVP8):
|
||||
return vp8CodecSpec(), true
|
||||
default:
|
||||
return codecSpec{}, false
|
||||
}
|
||||
}
|
||||
|
||||
func h264CodecSpec() codecSpec {
|
||||
return codecSpec{
|
||||
mimeType: webrtc.MimeTypeH264,
|
||||
capability: webrtc.RTPCodecCapability{
|
||||
MimeType: webrtc.MimeTypeH264,
|
||||
ClockRate: 90000,
|
||||
},
|
||||
depacketizer: func() rtp.Depacketizer { return &codecs.H264Packet{} },
|
||||
}
|
||||
}
|
||||
|
||||
func vp9CodecSpec() codecSpec {
|
||||
return codecSpec{
|
||||
mimeType: webrtc.MimeTypeVP9,
|
||||
capability: webrtc.RTPCodecCapability{
|
||||
MimeType: webrtc.MimeTypeVP9,
|
||||
ClockRate: 90000,
|
||||
},
|
||||
depacketizer: func() rtp.Depacketizer { return &codecs.VP9Packet{} },
|
||||
}
|
||||
}
|
||||
|
||||
func vp8CodecSpec() codecSpec {
|
||||
return codecSpec{
|
||||
mimeType: webrtc.MimeTypeVP8,
|
||||
capability: webrtc.RTPCodecCapability{
|
||||
MimeType: webrtc.MimeTypeVP8,
|
||||
ClockRate: 90000,
|
||||
},
|
||||
depacketizer: func() rtp.Depacketizer { return &codecs.VP8Packet{} },
|
||||
}
|
||||
}
|
||||
@@ -1,600 +0,0 @@
|
||||
package videochannel
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/exec"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pion/rtp"
|
||||
"github.com/pion/rtp/codecs"
|
||||
"github.com/pion/webrtc/v4"
|
||||
"github.com/pion/webrtc/v4/pkg/media/ivfreader"
|
||||
)
|
||||
|
||||
const (
|
||||
ffmpegFrameTimeout = 10 * time.Second
|
||||
|
||||
argCodecVideo = "-c:v"
|
||||
argPixFmt = "-pix_fmt"
|
||||
codecLibVPX = "libvpx"
|
||||
pixFmtYUV420P = "yuv420p"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrFFmpegUnavailable is returned when ffmpeg is not available on PATH.
|
||||
ErrFFmpegUnavailable = errors.New("ffmpeg is required for videochannel")
|
||||
// ErrUnsupportedVideoCodec is returned when videochannel cannot decode the negotiated codec.
|
||||
ErrUnsupportedVideoCodec = errors.New("unsupported video codec")
|
||||
// ErrEncoderTimeout is returned when the encoder does not produce a frame within the deadline.
|
||||
ErrEncoderTimeout = errors.New("encoder timeout")
|
||||
// ErrPopFrameTimeout is returned when no decoded frame is available within the deadline.
|
||||
ErrPopFrameTimeout = errors.New("pop frame timeout")
|
||||
// ErrUnexpectedFrameSize is returned when the raw frame size does not match expectations.
|
||||
ErrUnexpectedFrameSize = errors.New("unexpected encoder frame size")
|
||||
)
|
||||
|
||||
// FFmpegPath defines the path to the ffmpeg executable.
|
||||
//
|
||||
//nolint:gochecknoglobals // operator-controlled config overridden via CLI flag or FFMPEG_BIN env.
|
||||
var FFmpegPath = "ffmpeg"
|
||||
|
||||
type codecSpec struct {
|
||||
mimeType string
|
||||
fourCC string
|
||||
encoder string
|
||||
capability webrtc.RTPCodecCapability
|
||||
depacketizer func() rtp.Depacketizer
|
||||
encodeArgs []string
|
||||
}
|
||||
|
||||
func codecSpecForCarrier(_ string) codecSpec {
|
||||
return vp8CodecSpec()
|
||||
}
|
||||
|
||||
func codecSpecForMime(mimeType string) (codecSpec, bool) {
|
||||
switch strings.ToLower(mimeType) {
|
||||
case strings.ToLower(webrtc.MimeTypeH264):
|
||||
return h264CodecSpec(), true
|
||||
case strings.ToLower(webrtc.MimeTypeVP9):
|
||||
return vp9CodecSpec(), true
|
||||
case strings.ToLower(webrtc.MimeTypeVP8):
|
||||
return vp8CodecSpec(), true
|
||||
default:
|
||||
return codecSpec{}, false
|
||||
}
|
||||
}
|
||||
|
||||
func h264CodecSpec() codecSpec {
|
||||
return codecSpec{
|
||||
mimeType: webrtc.MimeTypeH264,
|
||||
fourCC: "H264",
|
||||
encoder: "libx264",
|
||||
capability: webrtc.RTPCodecCapability{
|
||||
MimeType: webrtc.MimeTypeH264,
|
||||
ClockRate: 90000,
|
||||
},
|
||||
depacketizer: func() rtp.Depacketizer { return &codecs.H264Packet{} },
|
||||
encodeArgs: []string{
|
||||
argCodecVideo, "libx264",
|
||||
"-preset", "ultrafast",
|
||||
"-tune", "zerolatency",
|
||||
"-g", "1",
|
||||
argPixFmt, pixFmtYUV420P,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func vp9CodecSpec() codecSpec {
|
||||
return codecSpec{
|
||||
mimeType: webrtc.MimeTypeVP9,
|
||||
fourCC: "VP90",
|
||||
encoder: "libvpx-vp9",
|
||||
capability: webrtc.RTPCodecCapability{
|
||||
MimeType: webrtc.MimeTypeVP9,
|
||||
ClockRate: 90000,
|
||||
},
|
||||
depacketizer: func() rtp.Depacketizer { return &codecs.VP9Packet{} },
|
||||
encodeArgs: []string{
|
||||
argCodecVideo, "libvpx-vp9",
|
||||
"-deadline", "realtime",
|
||||
"-cpu-used", "8",
|
||||
"-lag-in-frames", "0",
|
||||
"-error-resilient", "1",
|
||||
"-static-thresh", "0",
|
||||
"-g", "1",
|
||||
argPixFmt, pixFmtYUV420P,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func vp8CodecSpec() codecSpec {
|
||||
return codecSpec{
|
||||
mimeType: webrtc.MimeTypeVP8,
|
||||
fourCC: "VP80",
|
||||
encoder: codecLibVPX,
|
||||
capability: webrtc.RTPCodecCapability{
|
||||
MimeType: webrtc.MimeTypeVP8,
|
||||
ClockRate: 90000,
|
||||
},
|
||||
depacketizer: func() rtp.Depacketizer { return &codecs.VP8Packet{} },
|
||||
encodeArgs: []string{
|
||||
argCodecVideo, codecLibVPX,
|
||||
"-deadline", "realtime",
|
||||
"-cpu-used", "8",
|
||||
"-lag-in-frames", "0",
|
||||
"-error-resilient", "1",
|
||||
"-static-thresh", "0",
|
||||
"-g", "1",
|
||||
argPixFmt, pixFmtYUV420P,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func resolveEncoderCodec(spec codecSpec, hw string) string {
|
||||
if hw != "nvenc" {
|
||||
return spec.encoder
|
||||
}
|
||||
switch spec.mimeType {
|
||||
case webrtc.MimeTypeH264:
|
||||
return "h264_nvenc"
|
||||
case webrtc.MimeTypeVP8:
|
||||
return "vp8_nvenc"
|
||||
case webrtc.MimeTypeVP9:
|
||||
return "vp9_nvenc"
|
||||
case webrtc.MimeTypeAV1:
|
||||
return "av1_nvenc"
|
||||
default:
|
||||
return spec.encoder
|
||||
}
|
||||
}
|
||||
|
||||
func buildEncoderArgs(spec codecSpec, vcodec string, width, height, fps int, bitrate string) []string {
|
||||
args := []string{
|
||||
"-loglevel", "error", "-threads", "1",
|
||||
"-f", "rawvideo",
|
||||
argPixFmt, "gray",
|
||||
"-video_size", strconv.Itoa(width) + "x" + strconv.Itoa(height),
|
||||
"-framerate", strconv.Itoa(fps),
|
||||
"-i", "pipe:0",
|
||||
"-an",
|
||||
}
|
||||
|
||||
if strings.HasSuffix(vcodec, "_nvenc") {
|
||||
args = append(args, argCodecVideo, vcodec, "-preset", "p1", "-tune", "ull", "-rc", "vbr")
|
||||
} else {
|
||||
args = append(args, spec.encodeArgs...)
|
||||
}
|
||||
|
||||
args = append(args, "-g", "1", argPixFmt, pixFmtYUV420P, "-b:v", bitrate)
|
||||
|
||||
if spec.mimeType == webrtc.MimeTypeH264 {
|
||||
return append(args, "-f", "h264", "pipe:1")
|
||||
}
|
||||
return append(args, "-f", "ivf", "pipe:1")
|
||||
}
|
||||
|
||||
type ffmpegEncoder struct {
|
||||
cmd *exec.Cmd
|
||||
stdin io.WriteCloser
|
||||
stderr *bytes.Buffer
|
||||
frames chan []byte
|
||||
width int
|
||||
height int
|
||||
frameSize int
|
||||
closed atomic.Bool
|
||||
closeOnce sync.Once
|
||||
errMu sync.Mutex
|
||||
err error
|
||||
}
|
||||
|
||||
func newFFmpegEncoder(
|
||||
ctx context.Context,
|
||||
spec codecSpec,
|
||||
width, height, fps int,
|
||||
bitrate, hw string,
|
||||
) (*ffmpegEncoder, error) {
|
||||
ffmpegBin := FFmpegPath
|
||||
if envBin := os.Getenv("FFMPEG_BIN"); envBin != "" {
|
||||
ffmpegBin = envBin
|
||||
}
|
||||
|
||||
if ffmpegBin == "ffmpeg" {
|
||||
if _, err := exec.LookPath("ffmpeg"); err != nil {
|
||||
return nil, ErrFFmpegUnavailable
|
||||
}
|
||||
} else {
|
||||
if _, err := os.Stat(ffmpegBin); err != nil { //nolint:gosec,lll // G703: ffmpegBin is operator-controlled config, not user input.
|
||||
return nil, fmt.Errorf("%w: %w", ErrFFmpegUnavailable, err)
|
||||
}
|
||||
}
|
||||
|
||||
vcodec := resolveEncoderCodec(spec, hw)
|
||||
args := buildEncoderArgs(spec, vcodec, width, height, fps, bitrate)
|
||||
|
||||
cmd := exec.CommandContext(ctx, ffmpegBin, args...) //nolint:gosec,lll // G204: ffmpeg path is operator-controlled config, not user input
|
||||
stdin, err := cmd.StdinPipe()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("encoder stdin: %w", err)
|
||||
}
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("encoder stdout: %w", err)
|
||||
}
|
||||
stderr := &bytes.Buffer{}
|
||||
cmd.Stderr = stderr
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
return nil, fmt.Errorf("start encoder: %w", err)
|
||||
}
|
||||
|
||||
enc := &ffmpegEncoder{
|
||||
cmd: cmd,
|
||||
stdin: stdin,
|
||||
stderr: stderr,
|
||||
frames: make(chan []byte, 8),
|
||||
width: width,
|
||||
height: height,
|
||||
frameSize: width * height,
|
||||
}
|
||||
|
||||
if spec.mimeType == webrtc.MimeTypeH264 {
|
||||
go enc.readRawH264(stdout)
|
||||
} else {
|
||||
go enc.readIVF(stdout)
|
||||
}
|
||||
return enc, nil
|
||||
}
|
||||
|
||||
func (e *ffmpegEncoder) EncodeFrame(frame []byte) ([]byte, error) {
|
||||
if len(frame) != e.frameSize {
|
||||
return nil, fmt.Errorf("%w: got %d expected %d", ErrUnexpectedFrameSize, len(frame), e.frameSize)
|
||||
}
|
||||
if err := e.processErr(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := writeAll(e.stdin, frame); err != nil {
|
||||
return nil, fmt.Errorf("write encoder frame: %w", err)
|
||||
}
|
||||
|
||||
select {
|
||||
case sample, ok := <-e.frames:
|
||||
if !ok {
|
||||
return nil, e.processErr()
|
||||
}
|
||||
return sample, nil
|
||||
case <-time.After(ffmpegFrameTimeout):
|
||||
if err := e.processErr(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return nil, ErrEncoderTimeout
|
||||
}
|
||||
}
|
||||
|
||||
func (e *ffmpegEncoder) Close() error {
|
||||
e.closeOnce.Do(func() {
|
||||
e.closed.Store(true)
|
||||
_ = e.stdin.Close()
|
||||
if e.cmd.Process != nil {
|
||||
_ = e.cmd.Process.Kill()
|
||||
}
|
||||
_ = e.cmd.Wait()
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *ffmpegEncoder) readIVF(stdout io.Reader) {
|
||||
defer close(e.frames)
|
||||
reader, _, err := ivfreader.NewWith(stdout)
|
||||
if err != nil {
|
||||
e.setErr(fmt.Errorf("encoder ivf header: %w", err))
|
||||
return
|
||||
}
|
||||
for {
|
||||
frame, _, err := reader.ParseNextFrame()
|
||||
if err != nil {
|
||||
if !e.closed.Load() {
|
||||
e.setErr(fmt.Errorf("encoder ivf read: %w", err))
|
||||
}
|
||||
return
|
||||
}
|
||||
copyFrame := append([]byte(nil), frame...)
|
||||
if e.closed.Load() {
|
||||
return
|
||||
}
|
||||
e.frames <- copyFrame
|
||||
}
|
||||
}
|
||||
|
||||
func (e *ffmpegEncoder) readRawH264(stdout io.Reader) {
|
||||
defer close(e.frames)
|
||||
buf := make([]byte, 1024*1024)
|
||||
for {
|
||||
n, err := stdout.Read(buf)
|
||||
if err != nil {
|
||||
if !e.closed.Load() {
|
||||
e.setErr(fmt.Errorf("encoder h264 read: %w", err))
|
||||
}
|
||||
return
|
||||
}
|
||||
if n > 0 {
|
||||
copyFrame := append([]byte(nil), buf[:n]...)
|
||||
if e.closed.Load() {
|
||||
return
|
||||
}
|
||||
e.frames <- copyFrame
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (e *ffmpegEncoder) setErr(err error) {
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
e.errMu.Lock()
|
||||
defer e.errMu.Unlock()
|
||||
if e.err == nil {
|
||||
e.err = withStderr(err, e.stderr)
|
||||
}
|
||||
}
|
||||
|
||||
func (e *ffmpegEncoder) processErr() error {
|
||||
e.errMu.Lock()
|
||||
defer e.errMu.Unlock()
|
||||
if e.err != nil {
|
||||
return e.err
|
||||
}
|
||||
if e.closed.Load() {
|
||||
return ErrTransportClosed
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func resolveDecoderName(spec codecSpec, hw string) string {
|
||||
if hw != "nvenc" {
|
||||
return strings.ToLower(strings.TrimPrefix(spec.mimeType, "video/"))
|
||||
}
|
||||
switch spec.mimeType {
|
||||
case webrtc.MimeTypeH264:
|
||||
return "h264_cuvid"
|
||||
case webrtc.MimeTypeVP8:
|
||||
return "vp8_cuvid"
|
||||
case webrtc.MimeTypeVP9:
|
||||
return "vp9_cuvid"
|
||||
default:
|
||||
return strings.ToLower(strings.TrimPrefix(spec.mimeType, "video/"))
|
||||
}
|
||||
}
|
||||
|
||||
func buildDecoderArgs(spec codecSpec, decoderName string, width, height int, outputPixFmt string) []string {
|
||||
args := []string{"-loglevel", "error", "-threads", "1"}
|
||||
if spec.mimeType == webrtc.MimeTypeH264 {
|
||||
args = append(args, "-f", "h264")
|
||||
} else {
|
||||
args = append(args, "-f", "ivf")
|
||||
}
|
||||
|
||||
vfFilter := fmt.Sprintf("scale=%d:%d:flags=neighbor,format=%s", width, height, outputPixFmt)
|
||||
return append(args,
|
||||
"-flags", "low_delay",
|
||||
"-vcodec", decoderName,
|
||||
"-i", "pipe:0",
|
||||
"-an",
|
||||
"-vf", vfFilter,
|
||||
argPixFmt, outputPixFmt,
|
||||
"-f", "rawvideo",
|
||||
"pipe:1",
|
||||
)
|
||||
}
|
||||
|
||||
type ffmpegDecoder struct {
|
||||
cmd *exec.Cmd
|
||||
stdin io.WriteCloser
|
||||
stderr *bytes.Buffer
|
||||
frames chan []byte
|
||||
pts uint64
|
||||
mimeType string
|
||||
frameSize int
|
||||
closed atomic.Bool
|
||||
closeOnce sync.Once
|
||||
errMu sync.Mutex
|
||||
err error
|
||||
}
|
||||
|
||||
func newFFmpegDecoder(
|
||||
ctx context.Context,
|
||||
spec codecSpec,
|
||||
width, height, fps int,
|
||||
hw string,
|
||||
) (*ffmpegDecoder, error) {
|
||||
ffmpegBin := FFmpegPath
|
||||
if envBin := os.Getenv("FFMPEG_BIN"); envBin != "" {
|
||||
ffmpegBin = envBin
|
||||
}
|
||||
|
||||
if ffmpegBin == "ffmpeg" {
|
||||
if _, err := exec.LookPath("ffmpeg"); err != nil {
|
||||
return nil, ErrFFmpegUnavailable
|
||||
}
|
||||
} else {
|
||||
if _, err := os.Stat(ffmpegBin); err != nil { //nolint:gosec,lll // G703: ffmpegBin is operator-controlled config, not user input.
|
||||
return nil, fmt.Errorf("%w: %w", ErrFFmpegUnavailable, err)
|
||||
}
|
||||
}
|
||||
|
||||
decoderName := resolveDecoderName(spec, hw)
|
||||
args := buildDecoderArgs(spec, decoderName, width, height, "gray")
|
||||
|
||||
cmd := exec.CommandContext(ctx, ffmpegBin, args...) //nolint:gosec,lll // G204: ffmpeg path is operator-controlled config, not user input
|
||||
stdin, err := cmd.StdinPipe()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decoder stdin: %w", err)
|
||||
}
|
||||
stdout, err := cmd.StdoutPipe()
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("decoder stdout: %w", err)
|
||||
}
|
||||
stderr := &bytes.Buffer{}
|
||||
cmd.Stderr = stderr
|
||||
|
||||
if err := cmd.Start(); err != nil {
|
||||
return nil, fmt.Errorf("start decoder: %w", err)
|
||||
}
|
||||
|
||||
dec := &ffmpegDecoder{
|
||||
cmd: cmd,
|
||||
stdin: stdin,
|
||||
stderr: stderr,
|
||||
frames: make(chan []byte, 32),
|
||||
mimeType: spec.mimeType,
|
||||
frameSize: width * height,
|
||||
}
|
||||
|
||||
if spec.mimeType != webrtc.MimeTypeH264 {
|
||||
if err := writeIVFHeader(stdin, spec.fourCC, width, height, fps); err != nil {
|
||||
_ = dec.Close()
|
||||
return nil, fmt.Errorf("decoder ivf header: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
go dec.readRawFrames(stdout)
|
||||
return dec, nil
|
||||
}
|
||||
|
||||
func (d *ffmpegDecoder) PushSample(sample []byte) error {
|
||||
if err := d.processErr(); err != nil {
|
||||
return err
|
||||
}
|
||||
if d.mimeType == webrtc.MimeTypeH264 {
|
||||
if err := writeAll(d.stdin, sample); err != nil {
|
||||
return fmt.Errorf("write h264 decoder frame: %w", err)
|
||||
}
|
||||
} else {
|
||||
if err := writeIVFFrame(d.stdin, d.pts, sample); err != nil {
|
||||
return fmt.Errorf("write ivf decoder frame: %w", err)
|
||||
}
|
||||
d.pts++
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *ffmpegDecoder) PopFrame() ([]byte, error) {
|
||||
select {
|
||||
case frame, ok := <-d.frames:
|
||||
if !ok {
|
||||
return nil, d.processErr()
|
||||
}
|
||||
return frame, nil
|
||||
case <-time.After(10 * time.Second):
|
||||
return nil, ErrPopFrameTimeout
|
||||
}
|
||||
}
|
||||
|
||||
func (d *ffmpegDecoder) Close() error {
|
||||
d.closeOnce.Do(func() {
|
||||
d.closed.Store(true)
|
||||
_ = d.stdin.Close()
|
||||
if d.cmd.Process != nil {
|
||||
_ = d.cmd.Process.Kill()
|
||||
}
|
||||
_ = d.cmd.Wait()
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (d *ffmpegDecoder) readRawFrames(stdout io.Reader) {
|
||||
defer close(d.frames)
|
||||
buf := make([]byte, d.frameSize)
|
||||
for {
|
||||
if _, err := io.ReadFull(stdout, buf); err != nil {
|
||||
if !d.closed.Load() {
|
||||
d.setErr(fmt.Errorf("decoder raw read: %w", err))
|
||||
}
|
||||
return
|
||||
}
|
||||
copyFrame := append([]byte(nil), buf...)
|
||||
if d.closed.Load() {
|
||||
return
|
||||
}
|
||||
d.frames <- copyFrame
|
||||
}
|
||||
}
|
||||
|
||||
func (d *ffmpegDecoder) setErr(err error) {
|
||||
if err == nil {
|
||||
return
|
||||
}
|
||||
d.errMu.Lock()
|
||||
defer d.errMu.Unlock()
|
||||
if d.err == nil {
|
||||
d.err = withStderr(err, d.stderr)
|
||||
}
|
||||
}
|
||||
|
||||
func (d *ffmpegDecoder) processErr() error {
|
||||
d.errMu.Lock()
|
||||
defer d.errMu.Unlock()
|
||||
if d.err != nil {
|
||||
return d.err
|
||||
}
|
||||
if d.closed.Load() {
|
||||
return ErrTransportClosed
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func withStderr(err error, stderr *bytes.Buffer) error {
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
msg := strings.TrimSpace(stderr.String())
|
||||
if msg == "" {
|
||||
return err
|
||||
}
|
||||
return fmt.Errorf("%w: %s", err, msg)
|
||||
}
|
||||
|
||||
func writeIVFHeader(w io.Writer, fourCC string, width, height, frameRate int) error {
|
||||
header := make([]byte, 32)
|
||||
copy(header[0:4], []byte("DKIF"))
|
||||
binary.LittleEndian.PutUint16(header[4:6], 0)
|
||||
binary.LittleEndian.PutUint16(header[6:8], 32)
|
||||
copy(header[8:12], []byte(fourCC))
|
||||
binary.LittleEndian.PutUint16(header[12:14], uint16(width)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
|
||||
binary.LittleEndian.PutUint16(header[14:16], uint16(height)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
|
||||
binary.LittleEndian.PutUint32(header[16:20], uint32(frameRate)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
|
||||
binary.LittleEndian.PutUint32(header[20:24], 1)
|
||||
binary.LittleEndian.PutUint32(header[24:28], 0)
|
||||
binary.LittleEndian.PutUint32(header[28:32], 0)
|
||||
return writeAll(w, header)
|
||||
}
|
||||
|
||||
func writeIVFFrame(w io.Writer, pts uint64, frame []byte) error {
|
||||
header := make([]byte, 12)
|
||||
binary.LittleEndian.PutUint32(header[0:4], uint32(len(frame))) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
|
||||
binary.LittleEndian.PutUint64(header[4:12], pts)
|
||||
if err := writeAll(w, header); err != nil {
|
||||
return err
|
||||
}
|
||||
return writeAll(w, frame)
|
||||
}
|
||||
|
||||
func writeAll(w io.Writer, data []byte) error {
|
||||
for len(data) > 0 {
|
||||
n, err := w.Write(data)
|
||||
if err != nil {
|
||||
return fmt.Errorf("write: %w", err)
|
||||
}
|
||||
data = data[n:]
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -1,21 +1,12 @@
|
||||
package videochannel
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"io"
|
||||
"slices"
|
||||
"strings"
|
||||
"testing"
|
||||
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
var (
|
||||
errVideoFrameBase = errors.New("base")
|
||||
errVideoFrameBoom = errors.New("boom")
|
||||
)
|
||||
|
||||
func TestDecodeTransportFrameErrorsAndAck(t *testing.T) {
|
||||
tests := []struct {
|
||||
data []byte
|
||||
@@ -43,8 +34,7 @@ func TestDecodeTransportFrameErrorsAndAck(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
//nolint:cyclop // table-driven test naturally has many branches
|
||||
func TestCodecSpecsAndArgs(t *testing.T) {
|
||||
func TestCodecSpecForMime(t *testing.T) {
|
||||
for _, mime := range []string{webrtc.MimeTypeH264, webrtc.MimeTypeVP8, webrtc.MimeTypeVP9} {
|
||||
spec, ok := codecSpecForMime(mime)
|
||||
if !ok {
|
||||
@@ -57,188 +47,7 @@ func TestCodecSpecsAndArgs(t *testing.T) {
|
||||
if _, ok := codecSpecForMime("video/unknown"); ok {
|
||||
t.Fatal("codecSpecForMime() accepted unknown mime")
|
||||
}
|
||||
|
||||
if got := resolveEncoderCodec(h264CodecSpec(), "nvenc"); got != "h264_nvenc" {
|
||||
t.Fatalf("resolveEncoderCodec(h264,nvenc) = %q", got)
|
||||
}
|
||||
if got := resolveEncoderCodec(vp8CodecSpec(), "none"); got != "libvpx" {
|
||||
t.Fatalf("resolveEncoderCodec(vp8,none) = %q", got)
|
||||
}
|
||||
if got := resolveEncoderCodec(vp9CodecSpec(), "nvenc"); got != "vp9_nvenc" {
|
||||
t.Fatalf("resolveEncoderCodec(vp9,nvenc) = %q", got)
|
||||
}
|
||||
if got := resolveEncoderCodec(codecSpec{mimeType: webrtc.MimeTypeAV1, encoder: "libaom-av1"}, "nvenc"); got != "av1_nvenc" { //nolint:lll // long test description
|
||||
t.Fatalf("resolveEncoderCodec(av1,nvenc) = %q", got)
|
||||
}
|
||||
|
||||
args := buildEncoderArgs(vp8CodecSpec(), "vp8_nvenc", 320, 240, 30, "1M")
|
||||
for _, want := range []string{"-video_size", "320x240", "-framerate", "30", "vp8_nvenc", "-b:v", "1M", "ivf"} {
|
||||
if !slices.Contains(args, want) {
|
||||
t.Fatalf("buildEncoderArgs() = %v, missing %q", args, want)
|
||||
}
|
||||
}
|
||||
h264Args := buildEncoderArgs(h264CodecSpec(), "libx264", 320, 240, 30, "1M")
|
||||
if h264Args[len(h264Args)-2] != "h264" {
|
||||
t.Fatalf("h264 encoder args = %v", h264Args)
|
||||
}
|
||||
|
||||
if got := resolveDecoderName(h264CodecSpec(), "nvenc"); got != "h264_cuvid" {
|
||||
t.Fatalf("resolveDecoderName(h264,nvenc) = %q", got)
|
||||
}
|
||||
if got := resolveDecoderName(vp8CodecSpec(), "nvenc"); got != "vp8_cuvid" {
|
||||
t.Fatalf("resolveDecoderName(vp8,nvenc) = %q", got)
|
||||
}
|
||||
if got := resolveDecoderName(vp9CodecSpec(), "nvenc"); got != "vp9_cuvid" {
|
||||
t.Fatalf("resolveDecoderName(vp9,nvenc) = %q", got)
|
||||
}
|
||||
if got := resolveDecoderName(codecSpec{mimeType: "video/custom"}, "none"); got != "custom" {
|
||||
t.Fatalf("resolveDecoderName(custom,none) = %q", got)
|
||||
}
|
||||
decArgs := buildDecoderArgs(vp8CodecSpec(), "vp8", 320, 240, "gray")
|
||||
for _, want := range []string{"-f", "ivf", "-vcodec", "vp8", "scale=320:240:flags=neighbor,format=gray", "rawvideo"} {
|
||||
if !slices.Contains(decArgs, want) {
|
||||
t.Fatalf("buildDecoderArgs(vp8) = %v, missing %q", decArgs, want)
|
||||
}
|
||||
}
|
||||
h264DecArgs := buildDecoderArgs(h264CodecSpec(), "h264", 320, 240, "gray")
|
||||
if h264DecArgs[5] != "h264" {
|
||||
t.Fatalf("buildDecoderArgs(h264) = %v", h264DecArgs)
|
||||
}
|
||||
}
|
||||
|
||||
type shortWriter struct {
|
||||
writes int
|
||||
}
|
||||
|
||||
func (w *shortWriter) Write(p []byte) (int, error) {
|
||||
w.writes++
|
||||
if w.writes == 1 {
|
||||
return 1, nil
|
||||
}
|
||||
return len(p), nil
|
||||
}
|
||||
|
||||
type errWriter struct{}
|
||||
|
||||
func (w errWriter) Write([]byte) (int, error) { return 0, io.ErrClosedPipe }
|
||||
|
||||
type bufferWriteCloser struct {
|
||||
bytes.Buffer
|
||||
}
|
||||
|
||||
func (w *bufferWriteCloser) Close() error { return nil }
|
||||
|
||||
//nolint:cyclop // table-driven test naturally has many branches
|
||||
func TestIVFWritersAndWithStderr(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
if err := writeIVFHeader(&buf, "VP80", 320, 240, 30); err != nil {
|
||||
t.Fatalf("writeIVFHeader() error = %v", err)
|
||||
}
|
||||
if buf.Len() != 32 || string(buf.Bytes()[:4]) != "DKIF" {
|
||||
t.Fatalf("IVF header = %v", buf.Bytes())
|
||||
}
|
||||
|
||||
buf.Reset()
|
||||
if err := writeIVFFrame(&buf, 3, []byte("abc")); err != nil {
|
||||
t.Fatalf("writeIVFFrame() error = %v", err)
|
||||
}
|
||||
if buf.Len() != 15 {
|
||||
t.Fatalf("IVF frame len = %d, want 15", buf.Len())
|
||||
}
|
||||
|
||||
if err := writeAll(&shortWriter{}, []byte("abc")); err != nil {
|
||||
t.Fatalf("writeAll(shortWriter) error = %v", err)
|
||||
}
|
||||
if err := writeAll(errWriter{}, []byte("abc")); err == nil || !strings.Contains(err.Error(), "write:") {
|
||||
t.Fatalf("writeAll(errWriter) error = %v", err)
|
||||
}
|
||||
|
||||
baseErr := errVideoFrameBase
|
||||
if got := withStderr(baseErr, bytes.NewBufferString(" details \n")); got == nil || got.Error() != "base: details" {
|
||||
t.Fatalf("withStderr() = %v", got)
|
||||
}
|
||||
if got := withStderr(nil, bytes.NewBufferString("details")); got != nil {
|
||||
t.Fatalf("withStderr(nil) = %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFFmpegProcessErrAndFrameValidation(t *testing.T) {
|
||||
enc := &ffmpegEncoder{
|
||||
stderr: bytes.NewBufferString("encoder failed"),
|
||||
frames: make(chan []byte, 1),
|
||||
frameSize: 4,
|
||||
}
|
||||
if _, err := enc.EncodeFrame([]byte("bad")); !errors.Is(err, ErrUnexpectedFrameSize) {
|
||||
t.Fatalf("EncodeFrame(short) error = %v, want %v", err, ErrUnexpectedFrameSize)
|
||||
}
|
||||
enc.setErr(errVideoFrameBoom)
|
||||
if _, err := enc.EncodeFrame([]byte("good")); err == nil || !strings.Contains(err.Error(), "encoder failed") {
|
||||
t.Fatalf("EncodeFrame(processErr) error = %v", err)
|
||||
}
|
||||
|
||||
dec := &ffmpegDecoder{stderr: bytes.NewBufferString("decoder failed")}
|
||||
dec.setErr(errVideoFrameBoom)
|
||||
if err := dec.PushSample([]byte("sample")); err == nil || !strings.Contains(err.Error(), "decoder failed") {
|
||||
t.Fatalf("PushSample(processErr) error = %v", err)
|
||||
}
|
||||
closed := &ffmpegDecoder{}
|
||||
closed.closed.Store(true)
|
||||
if err := closed.processErr(); !errors.Is(err, ErrTransportClosed) {
|
||||
t.Fatalf("decoder processErr(closed) = %v, want %v", err, ErrTransportClosed)
|
||||
}
|
||||
}
|
||||
|
||||
//nolint:cyclop // table-driven test naturally has many branches
|
||||
func TestFFmpegReadersAndSampleWriters(t *testing.T) {
|
||||
var ivf bytes.Buffer
|
||||
if err := writeIVFHeader(&ivf, "VP80", 2, 2, 30); err != nil {
|
||||
t.Fatalf("writeIVFHeader() error = %v", err)
|
||||
}
|
||||
if err := writeIVFFrame(&ivf, 1, []byte("frame")); err != nil {
|
||||
t.Fatalf("writeIVFFrame() error = %v", err)
|
||||
}
|
||||
enc := &ffmpegEncoder{stderr: &bytes.Buffer{}, frames: make(chan []byte, 2)}
|
||||
enc.readIVF(&ivf)
|
||||
if got := <-enc.frames; !bytes.Equal(got, []byte("frame")) {
|
||||
t.Fatalf("readIVF frame = %q", got)
|
||||
}
|
||||
|
||||
enc = &ffmpegEncoder{stderr: &bytes.Buffer{}, frames: make(chan []byte, 2)}
|
||||
enc.readRawH264(bytes.NewBufferString("h264"))
|
||||
if got := <-enc.frames; !bytes.Equal(got, []byte("h264")) {
|
||||
t.Fatalf("readRawH264 frame = %q", got)
|
||||
}
|
||||
|
||||
dec := &ffmpegDecoder{stderr: &bytes.Buffer{}, frames: make(chan []byte, 2), frameSize: 4}
|
||||
dec.readRawFrames(bytes.NewBufferString("aaaabbbb"))
|
||||
if got := <-dec.frames; !bytes.Equal(got, []byte("aaaa")) {
|
||||
t.Fatalf("readRawFrames first = %q", got)
|
||||
}
|
||||
if got := <-dec.frames; !bytes.Equal(got, []byte("bbbb")) {
|
||||
t.Fatalf("readRawFrames second = %q", got)
|
||||
}
|
||||
|
||||
h264In := &bufferWriteCloser{}
|
||||
dec = &ffmpegDecoder{stdin: h264In, mimeType: webrtc.MimeTypeH264}
|
||||
if err := dec.PushSample([]byte("sample")); err != nil {
|
||||
t.Fatalf("PushSample(h264) error = %v", err)
|
||||
}
|
||||
if h264In.String() != "sample" {
|
||||
t.Fatalf("h264 stdin = %q", h264In.String())
|
||||
}
|
||||
|
||||
ivfIn := &bufferWriteCloser{}
|
||||
dec = &ffmpegDecoder{stdin: ivfIn, mimeType: webrtc.MimeTypeVP8}
|
||||
if err := dec.PushSample([]byte("vp8")); err != nil {
|
||||
t.Fatalf("PushSample(vp8) error = %v", err)
|
||||
}
|
||||
if ivfIn.Len() != 12+len("vp8") || dec.pts != 1 {
|
||||
t.Fatalf("ivf stdin len=%d pts=%d", ivfIn.Len(), dec.pts)
|
||||
}
|
||||
|
||||
dec = &ffmpegDecoder{frames: make(chan []byte, 1)}
|
||||
dec.frames <- []byte("ready")
|
||||
if got, err := dec.PopFrame(); err != nil || !bytes.Equal(got, []byte("ready")) {
|
||||
t.Fatalf("PopFrame() = %q, %v", got, err)
|
||||
if got := codecSpecForCarrier("any-carrier"); got.mimeType != webrtc.MimeTypeVP8 {
|
||||
t.Fatalf("codecSpecForCarrier() = %+v, want vp8", got)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -9,7 +9,7 @@ import (
|
||||
"codeberg.org/rape4me/kc/vp8"
|
||||
)
|
||||
|
||||
// goEncoder is a pure Go VP8 encoder replacing ffmpegEncoder.
|
||||
// goEncoder is a pure Go VP8 encoder.
|
||||
type goEncoder struct {
|
||||
enc *vp8.Encoder
|
||||
width int
|
||||
@@ -39,7 +39,11 @@ func (e *goEncoder) EncodeFrame(frame []byte) ([]byte, error) {
|
||||
}
|
||||
e.mu.Lock()
|
||||
defer e.mu.Unlock()
|
||||
return e.enc.Encode(frame)
|
||||
encoded, err := e.enc.Encode(frame)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("vp8 encode: %w", err)
|
||||
}
|
||||
return encoded, nil
|
||||
}
|
||||
|
||||
func (e *goEncoder) Close() error {
|
||||
@@ -47,7 +51,7 @@ func (e *goEncoder) Close() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// goDecoder is a pure Go VP8 decoder replacing ffmpegDecoder.
|
||||
// goDecoder is a pure Go VP8 decoder.
|
||||
type goDecoder struct {
|
||||
dec *vp8.Decoder
|
||||
width int
|
||||
|
||||
@@ -252,7 +252,7 @@ func TestOutboundPriorityRenderAndClosedEnqueue(t *testing.T) {
|
||||
// per-attempt ack budget covers a full FPS-paced round trip through every
|
||||
// fragment. Without this, multi-fragment payloads trigger premature
|
||||
// retransmits that pile fragments into the outbound channel and starve
|
||||
// the ffmpeg encoder until it is killed.
|
||||
// the encoder until it is killed.
|
||||
func TestPerAttemptAckTimeoutScalesWithFragments(t *testing.T) {
|
||||
// Tiny payload: floor at defaultAckTimeout.
|
||||
if got := perAttemptAckTimeout(1, 25); got != defaultAckTimeout {
|
||||
|
||||
Reference in New Issue
Block a user