From 4ab4413f50a35e1c72eeb280d0719a28d545adcf Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Wed, 27 May 2026 11:08:49 +0300 Subject: [PATCH] refactor(videochannel): remove ffmpeg support and fix lint --- cmd/olcrtc/main.go | 7 - internal/config/config.go | 1 - internal/transport/videochannel/codec_spec.go | 75 +++ internal/transport/videochannel/ffmpeg.go | 600 ------------------ .../videochannel/frame_extra_test.go | 197 +----- internal/transport/videochannel/gocodec.go | 10 +- .../videochannel/transport_unit_test.go | 2 +- 7 files changed, 86 insertions(+), 806 deletions(-) create mode 100644 internal/transport/videochannel/codec_spec.go delete mode 100644 internal/transport/videochannel/ffmpeg.go diff --git a/cmd/olcrtc/main.go b/cmd/olcrtc/main.go index 5890501..cf91fa4 100644 --- a/cmd/olcrtc/main.go +++ b/cmd/olcrtc/main.go @@ -25,7 +25,6 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/logger" "github.com/openlibrecommunity/olcrtc/internal/names" "github.com/openlibrecommunity/olcrtc/internal/supervisor" - "github.com/openlibrecommunity/olcrtc/internal/transport/videochannel" ) const modeGen = "gen" @@ -52,7 +51,6 @@ type loadedConfig struct { failover failoverConfig dataDir string debug bool - ffmpegPath string } type failoverConfig struct { @@ -115,7 +113,6 @@ func loadConfig(path string) (loadedConfig, error) { failover: failover, dataDir: f.Data, debug: f.Debug, - ffmpegPath: f.FFmpeg, }, nil } @@ -134,10 +131,6 @@ func parseFailoverConfig(f configpkg.Failover) (failoverConfig, error) { func runWithConfig(cfg loadedConfig) error { configureLogging(cfg.debug) - if cfg.ffmpegPath != "ffmpeg" && cfg.ffmpegPath != "" { - videochannel.FFmpegPath = cfg.ffmpegPath - } - scfg, err := session.ApplyAuthDefaults(cfg.scfg) if err != nil { return fmt.Errorf("validate config: %w", err) diff --git a/internal/config/config.go b/internal/config/config.go index 55fc9b3..dca5863 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -51,7 +51,6 @@ type File struct { Failover Failover `yaml:"failover"` Data string `yaml:"data"` Debug bool `yaml:"debug"` - FFmpeg string `yaml:"ffmpeg"` } // Profile is a failover entry that overrides top-level runtime fields. diff --git a/internal/transport/videochannel/codec_spec.go b/internal/transport/videochannel/codec_spec.go new file mode 100644 index 0000000..a548348 --- /dev/null +++ b/internal/transport/videochannel/codec_spec.go @@ -0,0 +1,75 @@ +package videochannel + +import ( + "errors" + "strings" + + "github.com/pion/rtp" + "github.com/pion/rtp/codecs" + "github.com/pion/webrtc/v4" +) + +// ErrUnexpectedFrameSize is returned when the raw frame size does not match expectations. +var ErrUnexpectedFrameSize = errors.New("unexpected encoder frame size") + +// codecSpec describes a video codec used by videochannel: its WebRTC +// capability and the depacketizer for inbound RTP streams. +type codecSpec struct { + mimeType string + capability webrtc.RTPCodecCapability + depacketizer func() rtp.Depacketizer +} + +// codecSpecForCarrier returns the codec used to negotiate outbound video for +// the given carrier. Currently every carrier uses VP8. +func codecSpecForCarrier(_ string) codecSpec { + return vp8CodecSpec() +} + +// codecSpecForMime returns the codec spec matching a WebRTC MIME type +// reported by the remote peer. +func codecSpecForMime(mimeType string) (codecSpec, bool) { + switch strings.ToLower(mimeType) { + case strings.ToLower(webrtc.MimeTypeH264): + return h264CodecSpec(), true + case strings.ToLower(webrtc.MimeTypeVP9): + return vp9CodecSpec(), true + case strings.ToLower(webrtc.MimeTypeVP8): + return vp8CodecSpec(), true + default: + return codecSpec{}, false + } +} + +func h264CodecSpec() codecSpec { + return codecSpec{ + mimeType: webrtc.MimeTypeH264, + capability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeH264, + ClockRate: 90000, + }, + depacketizer: func() rtp.Depacketizer { return &codecs.H264Packet{} }, + } +} + +func vp9CodecSpec() codecSpec { + return codecSpec{ + mimeType: webrtc.MimeTypeVP9, + capability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeVP9, + ClockRate: 90000, + }, + depacketizer: func() rtp.Depacketizer { return &codecs.VP9Packet{} }, + } +} + +func vp8CodecSpec() codecSpec { + return codecSpec{ + mimeType: webrtc.MimeTypeVP8, + capability: webrtc.RTPCodecCapability{ + MimeType: webrtc.MimeTypeVP8, + ClockRate: 90000, + }, + depacketizer: func() rtp.Depacketizer { return &codecs.VP8Packet{} }, + } +} diff --git a/internal/transport/videochannel/ffmpeg.go b/internal/transport/videochannel/ffmpeg.go deleted file mode 100644 index e2b6512..0000000 --- a/internal/transport/videochannel/ffmpeg.go +++ /dev/null @@ -1,600 +0,0 @@ -package videochannel - -import ( - "bytes" - "context" - "encoding/binary" - "errors" - "fmt" - "io" - "os" - "os/exec" - "strconv" - "strings" - "sync" - "sync/atomic" - "time" - - "github.com/pion/rtp" - "github.com/pion/rtp/codecs" - "github.com/pion/webrtc/v4" - "github.com/pion/webrtc/v4/pkg/media/ivfreader" -) - -const ( - ffmpegFrameTimeout = 10 * time.Second - - argCodecVideo = "-c:v" - argPixFmt = "-pix_fmt" - codecLibVPX = "libvpx" - pixFmtYUV420P = "yuv420p" -) - -var ( - // ErrFFmpegUnavailable is returned when ffmpeg is not available on PATH. - ErrFFmpegUnavailable = errors.New("ffmpeg is required for videochannel") - // ErrUnsupportedVideoCodec is returned when videochannel cannot decode the negotiated codec. - ErrUnsupportedVideoCodec = errors.New("unsupported video codec") - // ErrEncoderTimeout is returned when the encoder does not produce a frame within the deadline. - ErrEncoderTimeout = errors.New("encoder timeout") - // ErrPopFrameTimeout is returned when no decoded frame is available within the deadline. - ErrPopFrameTimeout = errors.New("pop frame timeout") - // ErrUnexpectedFrameSize is returned when the raw frame size does not match expectations. - ErrUnexpectedFrameSize = errors.New("unexpected encoder frame size") -) - -// FFmpegPath defines the path to the ffmpeg executable. -// -//nolint:gochecknoglobals // operator-controlled config overridden via CLI flag or FFMPEG_BIN env. -var FFmpegPath = "ffmpeg" - -type codecSpec struct { - mimeType string - fourCC string - encoder string - capability webrtc.RTPCodecCapability - depacketizer func() rtp.Depacketizer - encodeArgs []string -} - -func codecSpecForCarrier(_ string) codecSpec { - return vp8CodecSpec() -} - -func codecSpecForMime(mimeType string) (codecSpec, bool) { - switch strings.ToLower(mimeType) { - case strings.ToLower(webrtc.MimeTypeH264): - return h264CodecSpec(), true - case strings.ToLower(webrtc.MimeTypeVP9): - return vp9CodecSpec(), true - case strings.ToLower(webrtc.MimeTypeVP8): - return vp8CodecSpec(), true - default: - return codecSpec{}, false - } -} - -func h264CodecSpec() codecSpec { - return codecSpec{ - mimeType: webrtc.MimeTypeH264, - fourCC: "H264", - encoder: "libx264", - capability: webrtc.RTPCodecCapability{ - MimeType: webrtc.MimeTypeH264, - ClockRate: 90000, - }, - depacketizer: func() rtp.Depacketizer { return &codecs.H264Packet{} }, - encodeArgs: []string{ - argCodecVideo, "libx264", - "-preset", "ultrafast", - "-tune", "zerolatency", - "-g", "1", - argPixFmt, pixFmtYUV420P, - }, - } -} - -func vp9CodecSpec() codecSpec { - return codecSpec{ - mimeType: webrtc.MimeTypeVP9, - fourCC: "VP90", - encoder: "libvpx-vp9", - capability: webrtc.RTPCodecCapability{ - MimeType: webrtc.MimeTypeVP9, - ClockRate: 90000, - }, - depacketizer: func() rtp.Depacketizer { return &codecs.VP9Packet{} }, - encodeArgs: []string{ - argCodecVideo, "libvpx-vp9", - "-deadline", "realtime", - "-cpu-used", "8", - "-lag-in-frames", "0", - "-error-resilient", "1", - "-static-thresh", "0", - "-g", "1", - argPixFmt, pixFmtYUV420P, - }, - } -} - -func vp8CodecSpec() codecSpec { - return codecSpec{ - mimeType: webrtc.MimeTypeVP8, - fourCC: "VP80", - encoder: codecLibVPX, - capability: webrtc.RTPCodecCapability{ - MimeType: webrtc.MimeTypeVP8, - ClockRate: 90000, - }, - depacketizer: func() rtp.Depacketizer { return &codecs.VP8Packet{} }, - encodeArgs: []string{ - argCodecVideo, codecLibVPX, - "-deadline", "realtime", - "-cpu-used", "8", - "-lag-in-frames", "0", - "-error-resilient", "1", - "-static-thresh", "0", - "-g", "1", - argPixFmt, pixFmtYUV420P, - }, - } -} - -func resolveEncoderCodec(spec codecSpec, hw string) string { - if hw != "nvenc" { - return spec.encoder - } - switch spec.mimeType { - case webrtc.MimeTypeH264: - return "h264_nvenc" - case webrtc.MimeTypeVP8: - return "vp8_nvenc" - case webrtc.MimeTypeVP9: - return "vp9_nvenc" - case webrtc.MimeTypeAV1: - return "av1_nvenc" - default: - return spec.encoder - } -} - -func buildEncoderArgs(spec codecSpec, vcodec string, width, height, fps int, bitrate string) []string { - args := []string{ - "-loglevel", "error", "-threads", "1", - "-f", "rawvideo", - argPixFmt, "gray", - "-video_size", strconv.Itoa(width) + "x" + strconv.Itoa(height), - "-framerate", strconv.Itoa(fps), - "-i", "pipe:0", - "-an", - } - - if strings.HasSuffix(vcodec, "_nvenc") { - args = append(args, argCodecVideo, vcodec, "-preset", "p1", "-tune", "ull", "-rc", "vbr") - } else { - args = append(args, spec.encodeArgs...) - } - - args = append(args, "-g", "1", argPixFmt, pixFmtYUV420P, "-b:v", bitrate) - - if spec.mimeType == webrtc.MimeTypeH264 { - return append(args, "-f", "h264", "pipe:1") - } - return append(args, "-f", "ivf", "pipe:1") -} - -type ffmpegEncoder struct { - cmd *exec.Cmd - stdin io.WriteCloser - stderr *bytes.Buffer - frames chan []byte - width int - height int - frameSize int - closed atomic.Bool - closeOnce sync.Once - errMu sync.Mutex - err error -} - -func newFFmpegEncoder( - ctx context.Context, - spec codecSpec, - width, height, fps int, - bitrate, hw string, -) (*ffmpegEncoder, error) { - ffmpegBin := FFmpegPath - if envBin := os.Getenv("FFMPEG_BIN"); envBin != "" { - ffmpegBin = envBin - } - - if ffmpegBin == "ffmpeg" { - if _, err := exec.LookPath("ffmpeg"); err != nil { - return nil, ErrFFmpegUnavailable - } - } else { - if _, err := os.Stat(ffmpegBin); err != nil { //nolint:gosec,lll // G703: ffmpegBin is operator-controlled config, not user input. - return nil, fmt.Errorf("%w: %w", ErrFFmpegUnavailable, err) - } - } - - vcodec := resolveEncoderCodec(spec, hw) - args := buildEncoderArgs(spec, vcodec, width, height, fps, bitrate) - - cmd := exec.CommandContext(ctx, ffmpegBin, args...) //nolint:gosec,lll // G204: ffmpeg path is operator-controlled config, not user input - stdin, err := cmd.StdinPipe() - if err != nil { - return nil, fmt.Errorf("encoder stdin: %w", err) - } - stdout, err := cmd.StdoutPipe() - if err != nil { - return nil, fmt.Errorf("encoder stdout: %w", err) - } - stderr := &bytes.Buffer{} - cmd.Stderr = stderr - - if err := cmd.Start(); err != nil { - return nil, fmt.Errorf("start encoder: %w", err) - } - - enc := &ffmpegEncoder{ - cmd: cmd, - stdin: stdin, - stderr: stderr, - frames: make(chan []byte, 8), - width: width, - height: height, - frameSize: width * height, - } - - if spec.mimeType == webrtc.MimeTypeH264 { - go enc.readRawH264(stdout) - } else { - go enc.readIVF(stdout) - } - return enc, nil -} - -func (e *ffmpegEncoder) EncodeFrame(frame []byte) ([]byte, error) { - if len(frame) != e.frameSize { - return nil, fmt.Errorf("%w: got %d expected %d", ErrUnexpectedFrameSize, len(frame), e.frameSize) - } - if err := e.processErr(); err != nil { - return nil, err - } - if err := writeAll(e.stdin, frame); err != nil { - return nil, fmt.Errorf("write encoder frame: %w", err) - } - - select { - case sample, ok := <-e.frames: - if !ok { - return nil, e.processErr() - } - return sample, nil - case <-time.After(ffmpegFrameTimeout): - if err := e.processErr(); err != nil { - return nil, err - } - return nil, ErrEncoderTimeout - } -} - -func (e *ffmpegEncoder) Close() error { - e.closeOnce.Do(func() { - e.closed.Store(true) - _ = e.stdin.Close() - if e.cmd.Process != nil { - _ = e.cmd.Process.Kill() - } - _ = e.cmd.Wait() - }) - return nil -} - -func (e *ffmpegEncoder) readIVF(stdout io.Reader) { - defer close(e.frames) - reader, _, err := ivfreader.NewWith(stdout) - if err != nil { - e.setErr(fmt.Errorf("encoder ivf header: %w", err)) - return - } - for { - frame, _, err := reader.ParseNextFrame() - if err != nil { - if !e.closed.Load() { - e.setErr(fmt.Errorf("encoder ivf read: %w", err)) - } - return - } - copyFrame := append([]byte(nil), frame...) - if e.closed.Load() { - return - } - e.frames <- copyFrame - } -} - -func (e *ffmpegEncoder) readRawH264(stdout io.Reader) { - defer close(e.frames) - buf := make([]byte, 1024*1024) - for { - n, err := stdout.Read(buf) - if err != nil { - if !e.closed.Load() { - e.setErr(fmt.Errorf("encoder h264 read: %w", err)) - } - return - } - if n > 0 { - copyFrame := append([]byte(nil), buf[:n]...) - if e.closed.Load() { - return - } - e.frames <- copyFrame - } - } -} - -func (e *ffmpegEncoder) setErr(err error) { - if err == nil { - return - } - e.errMu.Lock() - defer e.errMu.Unlock() - if e.err == nil { - e.err = withStderr(err, e.stderr) - } -} - -func (e *ffmpegEncoder) processErr() error { - e.errMu.Lock() - defer e.errMu.Unlock() - if e.err != nil { - return e.err - } - if e.closed.Load() { - return ErrTransportClosed - } - return nil -} - -func resolveDecoderName(spec codecSpec, hw string) string { - if hw != "nvenc" { - return strings.ToLower(strings.TrimPrefix(spec.mimeType, "video/")) - } - switch spec.mimeType { - case webrtc.MimeTypeH264: - return "h264_cuvid" - case webrtc.MimeTypeVP8: - return "vp8_cuvid" - case webrtc.MimeTypeVP9: - return "vp9_cuvid" - default: - return strings.ToLower(strings.TrimPrefix(spec.mimeType, "video/")) - } -} - -func buildDecoderArgs(spec codecSpec, decoderName string, width, height int, outputPixFmt string) []string { - args := []string{"-loglevel", "error", "-threads", "1"} - if spec.mimeType == webrtc.MimeTypeH264 { - args = append(args, "-f", "h264") - } else { - args = append(args, "-f", "ivf") - } - - vfFilter := fmt.Sprintf("scale=%d:%d:flags=neighbor,format=%s", width, height, outputPixFmt) - return append(args, - "-flags", "low_delay", - "-vcodec", decoderName, - "-i", "pipe:0", - "-an", - "-vf", vfFilter, - argPixFmt, outputPixFmt, - "-f", "rawvideo", - "pipe:1", - ) -} - -type ffmpegDecoder struct { - cmd *exec.Cmd - stdin io.WriteCloser - stderr *bytes.Buffer - frames chan []byte - pts uint64 - mimeType string - frameSize int - closed atomic.Bool - closeOnce sync.Once - errMu sync.Mutex - err error -} - -func newFFmpegDecoder( - ctx context.Context, - spec codecSpec, - width, height, fps int, - hw string, -) (*ffmpegDecoder, error) { - ffmpegBin := FFmpegPath - if envBin := os.Getenv("FFMPEG_BIN"); envBin != "" { - ffmpegBin = envBin - } - - if ffmpegBin == "ffmpeg" { - if _, err := exec.LookPath("ffmpeg"); err != nil { - return nil, ErrFFmpegUnavailable - } - } else { - if _, err := os.Stat(ffmpegBin); err != nil { //nolint:gosec,lll // G703: ffmpegBin is operator-controlled config, not user input. - return nil, fmt.Errorf("%w: %w", ErrFFmpegUnavailable, err) - } - } - - decoderName := resolveDecoderName(spec, hw) - args := buildDecoderArgs(spec, decoderName, width, height, "gray") - - cmd := exec.CommandContext(ctx, ffmpegBin, args...) //nolint:gosec,lll // G204: ffmpeg path is operator-controlled config, not user input - stdin, err := cmd.StdinPipe() - if err != nil { - return nil, fmt.Errorf("decoder stdin: %w", err) - } - stdout, err := cmd.StdoutPipe() - if err != nil { - return nil, fmt.Errorf("decoder stdout: %w", err) - } - stderr := &bytes.Buffer{} - cmd.Stderr = stderr - - if err := cmd.Start(); err != nil { - return nil, fmt.Errorf("start decoder: %w", err) - } - - dec := &ffmpegDecoder{ - cmd: cmd, - stdin: stdin, - stderr: stderr, - frames: make(chan []byte, 32), - mimeType: spec.mimeType, - frameSize: width * height, - } - - if spec.mimeType != webrtc.MimeTypeH264 { - if err := writeIVFHeader(stdin, spec.fourCC, width, height, fps); err != nil { - _ = dec.Close() - return nil, fmt.Errorf("decoder ivf header: %w", err) - } - } - - go dec.readRawFrames(stdout) - return dec, nil -} - -func (d *ffmpegDecoder) PushSample(sample []byte) error { - if err := d.processErr(); err != nil { - return err - } - if d.mimeType == webrtc.MimeTypeH264 { - if err := writeAll(d.stdin, sample); err != nil { - return fmt.Errorf("write h264 decoder frame: %w", err) - } - } else { - if err := writeIVFFrame(d.stdin, d.pts, sample); err != nil { - return fmt.Errorf("write ivf decoder frame: %w", err) - } - d.pts++ - } - return nil -} - -func (d *ffmpegDecoder) PopFrame() ([]byte, error) { - select { - case frame, ok := <-d.frames: - if !ok { - return nil, d.processErr() - } - return frame, nil - case <-time.After(10 * time.Second): - return nil, ErrPopFrameTimeout - } -} - -func (d *ffmpegDecoder) Close() error { - d.closeOnce.Do(func() { - d.closed.Store(true) - _ = d.stdin.Close() - if d.cmd.Process != nil { - _ = d.cmd.Process.Kill() - } - _ = d.cmd.Wait() - }) - return nil -} - -func (d *ffmpegDecoder) readRawFrames(stdout io.Reader) { - defer close(d.frames) - buf := make([]byte, d.frameSize) - for { - if _, err := io.ReadFull(stdout, buf); err != nil { - if !d.closed.Load() { - d.setErr(fmt.Errorf("decoder raw read: %w", err)) - } - return - } - copyFrame := append([]byte(nil), buf...) - if d.closed.Load() { - return - } - d.frames <- copyFrame - } -} - -func (d *ffmpegDecoder) setErr(err error) { - if err == nil { - return - } - d.errMu.Lock() - defer d.errMu.Unlock() - if d.err == nil { - d.err = withStderr(err, d.stderr) - } -} - -func (d *ffmpegDecoder) processErr() error { - d.errMu.Lock() - defer d.errMu.Unlock() - if d.err != nil { - return d.err - } - if d.closed.Load() { - return ErrTransportClosed - } - return nil -} - -func withStderr(err error, stderr *bytes.Buffer) error { - if err == nil { - return nil - } - msg := strings.TrimSpace(stderr.String()) - if msg == "" { - return err - } - return fmt.Errorf("%w: %s", err, msg) -} - -func writeIVFHeader(w io.Writer, fourCC string, width, height, frameRate int) error { - header := make([]byte, 32) - copy(header[0:4], []byte("DKIF")) - binary.LittleEndian.PutUint16(header[4:6], 0) - binary.LittleEndian.PutUint16(header[6:8], 32) - copy(header[8:12], []byte(fourCC)) - binary.LittleEndian.PutUint16(header[12:14], uint16(width)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic - binary.LittleEndian.PutUint16(header[14:16], uint16(height)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic - binary.LittleEndian.PutUint32(header[16:20], uint32(frameRate)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic - binary.LittleEndian.PutUint32(header[20:24], 1) - binary.LittleEndian.PutUint32(header[24:28], 0) - binary.LittleEndian.PutUint32(header[28:32], 0) - return writeAll(w, header) -} - -func writeIVFFrame(w io.Writer, pts uint64, frame []byte) error { - header := make([]byte, 12) - binary.LittleEndian.PutUint32(header[0:4], uint32(len(frame))) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic - binary.LittleEndian.PutUint64(header[4:12], pts) - if err := writeAll(w, header); err != nil { - return err - } - return writeAll(w, frame) -} - -func writeAll(w io.Writer, data []byte) error { - for len(data) > 0 { - n, err := w.Write(data) - if err != nil { - return fmt.Errorf("write: %w", err) - } - data = data[n:] - } - return nil -} diff --git a/internal/transport/videochannel/frame_extra_test.go b/internal/transport/videochannel/frame_extra_test.go index bb27231..60425df 100644 --- a/internal/transport/videochannel/frame_extra_test.go +++ b/internal/transport/videochannel/frame_extra_test.go @@ -1,21 +1,12 @@ package videochannel import ( - "bytes" "errors" - "io" - "slices" - "strings" "testing" "github.com/pion/webrtc/v4" ) -var ( - errVideoFrameBase = errors.New("base") - errVideoFrameBoom = errors.New("boom") -) - func TestDecodeTransportFrameErrorsAndAck(t *testing.T) { tests := []struct { data []byte @@ -43,8 +34,7 @@ func TestDecodeTransportFrameErrorsAndAck(t *testing.T) { } } -//nolint:cyclop // table-driven test naturally has many branches -func TestCodecSpecsAndArgs(t *testing.T) { +func TestCodecSpecForMime(t *testing.T) { for _, mime := range []string{webrtc.MimeTypeH264, webrtc.MimeTypeVP8, webrtc.MimeTypeVP9} { spec, ok := codecSpecForMime(mime) if !ok { @@ -57,188 +47,7 @@ func TestCodecSpecsAndArgs(t *testing.T) { if _, ok := codecSpecForMime("video/unknown"); ok { t.Fatal("codecSpecForMime() accepted unknown mime") } - - if got := resolveEncoderCodec(h264CodecSpec(), "nvenc"); got != "h264_nvenc" { - t.Fatalf("resolveEncoderCodec(h264,nvenc) = %q", got) - } - if got := resolveEncoderCodec(vp8CodecSpec(), "none"); got != "libvpx" { - t.Fatalf("resolveEncoderCodec(vp8,none) = %q", got) - } - if got := resolveEncoderCodec(vp9CodecSpec(), "nvenc"); got != "vp9_nvenc" { - t.Fatalf("resolveEncoderCodec(vp9,nvenc) = %q", got) - } - if got := resolveEncoderCodec(codecSpec{mimeType: webrtc.MimeTypeAV1, encoder: "libaom-av1"}, "nvenc"); got != "av1_nvenc" { //nolint:lll // long test description - t.Fatalf("resolveEncoderCodec(av1,nvenc) = %q", got) - } - - args := buildEncoderArgs(vp8CodecSpec(), "vp8_nvenc", 320, 240, 30, "1M") - for _, want := range []string{"-video_size", "320x240", "-framerate", "30", "vp8_nvenc", "-b:v", "1M", "ivf"} { - if !slices.Contains(args, want) { - t.Fatalf("buildEncoderArgs() = %v, missing %q", args, want) - } - } - h264Args := buildEncoderArgs(h264CodecSpec(), "libx264", 320, 240, 30, "1M") - if h264Args[len(h264Args)-2] != "h264" { - t.Fatalf("h264 encoder args = %v", h264Args) - } - - if got := resolveDecoderName(h264CodecSpec(), "nvenc"); got != "h264_cuvid" { - t.Fatalf("resolveDecoderName(h264,nvenc) = %q", got) - } - if got := resolveDecoderName(vp8CodecSpec(), "nvenc"); got != "vp8_cuvid" { - t.Fatalf("resolveDecoderName(vp8,nvenc) = %q", got) - } - if got := resolveDecoderName(vp9CodecSpec(), "nvenc"); got != "vp9_cuvid" { - t.Fatalf("resolveDecoderName(vp9,nvenc) = %q", got) - } - if got := resolveDecoderName(codecSpec{mimeType: "video/custom"}, "none"); got != "custom" { - t.Fatalf("resolveDecoderName(custom,none) = %q", got) - } - decArgs := buildDecoderArgs(vp8CodecSpec(), "vp8", 320, 240, "gray") - for _, want := range []string{"-f", "ivf", "-vcodec", "vp8", "scale=320:240:flags=neighbor,format=gray", "rawvideo"} { - if !slices.Contains(decArgs, want) { - t.Fatalf("buildDecoderArgs(vp8) = %v, missing %q", decArgs, want) - } - } - h264DecArgs := buildDecoderArgs(h264CodecSpec(), "h264", 320, 240, "gray") - if h264DecArgs[5] != "h264" { - t.Fatalf("buildDecoderArgs(h264) = %v", h264DecArgs) - } -} - -type shortWriter struct { - writes int -} - -func (w *shortWriter) Write(p []byte) (int, error) { - w.writes++ - if w.writes == 1 { - return 1, nil - } - return len(p), nil -} - -type errWriter struct{} - -func (w errWriter) Write([]byte) (int, error) { return 0, io.ErrClosedPipe } - -type bufferWriteCloser struct { - bytes.Buffer -} - -func (w *bufferWriteCloser) Close() error { return nil } - -//nolint:cyclop // table-driven test naturally has many branches -func TestIVFWritersAndWithStderr(t *testing.T) { - var buf bytes.Buffer - if err := writeIVFHeader(&buf, "VP80", 320, 240, 30); err != nil { - t.Fatalf("writeIVFHeader() error = %v", err) - } - if buf.Len() != 32 || string(buf.Bytes()[:4]) != "DKIF" { - t.Fatalf("IVF header = %v", buf.Bytes()) - } - - buf.Reset() - if err := writeIVFFrame(&buf, 3, []byte("abc")); err != nil { - t.Fatalf("writeIVFFrame() error = %v", err) - } - if buf.Len() != 15 { - t.Fatalf("IVF frame len = %d, want 15", buf.Len()) - } - - if err := writeAll(&shortWriter{}, []byte("abc")); err != nil { - t.Fatalf("writeAll(shortWriter) error = %v", err) - } - if err := writeAll(errWriter{}, []byte("abc")); err == nil || !strings.Contains(err.Error(), "write:") { - t.Fatalf("writeAll(errWriter) error = %v", err) - } - - baseErr := errVideoFrameBase - if got := withStderr(baseErr, bytes.NewBufferString(" details \n")); got == nil || got.Error() != "base: details" { - t.Fatalf("withStderr() = %v", got) - } - if got := withStderr(nil, bytes.NewBufferString("details")); got != nil { - t.Fatalf("withStderr(nil) = %v", got) - } -} - -func TestFFmpegProcessErrAndFrameValidation(t *testing.T) { - enc := &ffmpegEncoder{ - stderr: bytes.NewBufferString("encoder failed"), - frames: make(chan []byte, 1), - frameSize: 4, - } - if _, err := enc.EncodeFrame([]byte("bad")); !errors.Is(err, ErrUnexpectedFrameSize) { - t.Fatalf("EncodeFrame(short) error = %v, want %v", err, ErrUnexpectedFrameSize) - } - enc.setErr(errVideoFrameBoom) - if _, err := enc.EncodeFrame([]byte("good")); err == nil || !strings.Contains(err.Error(), "encoder failed") { - t.Fatalf("EncodeFrame(processErr) error = %v", err) - } - - dec := &ffmpegDecoder{stderr: bytes.NewBufferString("decoder failed")} - dec.setErr(errVideoFrameBoom) - if err := dec.PushSample([]byte("sample")); err == nil || !strings.Contains(err.Error(), "decoder failed") { - t.Fatalf("PushSample(processErr) error = %v", err) - } - closed := &ffmpegDecoder{} - closed.closed.Store(true) - if err := closed.processErr(); !errors.Is(err, ErrTransportClosed) { - t.Fatalf("decoder processErr(closed) = %v, want %v", err, ErrTransportClosed) - } -} - -//nolint:cyclop // table-driven test naturally has many branches -func TestFFmpegReadersAndSampleWriters(t *testing.T) { - var ivf bytes.Buffer - if err := writeIVFHeader(&ivf, "VP80", 2, 2, 30); err != nil { - t.Fatalf("writeIVFHeader() error = %v", err) - } - if err := writeIVFFrame(&ivf, 1, []byte("frame")); err != nil { - t.Fatalf("writeIVFFrame() error = %v", err) - } - enc := &ffmpegEncoder{stderr: &bytes.Buffer{}, frames: make(chan []byte, 2)} - enc.readIVF(&ivf) - if got := <-enc.frames; !bytes.Equal(got, []byte("frame")) { - t.Fatalf("readIVF frame = %q", got) - } - - enc = &ffmpegEncoder{stderr: &bytes.Buffer{}, frames: make(chan []byte, 2)} - enc.readRawH264(bytes.NewBufferString("h264")) - if got := <-enc.frames; !bytes.Equal(got, []byte("h264")) { - t.Fatalf("readRawH264 frame = %q", got) - } - - dec := &ffmpegDecoder{stderr: &bytes.Buffer{}, frames: make(chan []byte, 2), frameSize: 4} - dec.readRawFrames(bytes.NewBufferString("aaaabbbb")) - if got := <-dec.frames; !bytes.Equal(got, []byte("aaaa")) { - t.Fatalf("readRawFrames first = %q", got) - } - if got := <-dec.frames; !bytes.Equal(got, []byte("bbbb")) { - t.Fatalf("readRawFrames second = %q", got) - } - - h264In := &bufferWriteCloser{} - dec = &ffmpegDecoder{stdin: h264In, mimeType: webrtc.MimeTypeH264} - if err := dec.PushSample([]byte("sample")); err != nil { - t.Fatalf("PushSample(h264) error = %v", err) - } - if h264In.String() != "sample" { - t.Fatalf("h264 stdin = %q", h264In.String()) - } - - ivfIn := &bufferWriteCloser{} - dec = &ffmpegDecoder{stdin: ivfIn, mimeType: webrtc.MimeTypeVP8} - if err := dec.PushSample([]byte("vp8")); err != nil { - t.Fatalf("PushSample(vp8) error = %v", err) - } - if ivfIn.Len() != 12+len("vp8") || dec.pts != 1 { - t.Fatalf("ivf stdin len=%d pts=%d", ivfIn.Len(), dec.pts) - } - - dec = &ffmpegDecoder{frames: make(chan []byte, 1)} - dec.frames <- []byte("ready") - if got, err := dec.PopFrame(); err != nil || !bytes.Equal(got, []byte("ready")) { - t.Fatalf("PopFrame() = %q, %v", got, err) + if got := codecSpecForCarrier("any-carrier"); got.mimeType != webrtc.MimeTypeVP8 { + t.Fatalf("codecSpecForCarrier() = %+v, want vp8", got) } } diff --git a/internal/transport/videochannel/gocodec.go b/internal/transport/videochannel/gocodec.go index 0ef463d..d440826 100644 --- a/internal/transport/videochannel/gocodec.go +++ b/internal/transport/videochannel/gocodec.go @@ -9,7 +9,7 @@ import ( "codeberg.org/rape4me/kc/vp8" ) -// goEncoder is a pure Go VP8 encoder replacing ffmpegEncoder. +// goEncoder is a pure Go VP8 encoder. type goEncoder struct { enc *vp8.Encoder width int @@ -39,7 +39,11 @@ func (e *goEncoder) EncodeFrame(frame []byte) ([]byte, error) { } e.mu.Lock() defer e.mu.Unlock() - return e.enc.Encode(frame) + encoded, err := e.enc.Encode(frame) + if err != nil { + return nil, fmt.Errorf("vp8 encode: %w", err) + } + return encoded, nil } func (e *goEncoder) Close() error { @@ -47,7 +51,7 @@ func (e *goEncoder) Close() error { return nil } -// goDecoder is a pure Go VP8 decoder replacing ffmpegDecoder. +// goDecoder is a pure Go VP8 decoder. type goDecoder struct { dec *vp8.Decoder width int diff --git a/internal/transport/videochannel/transport_unit_test.go b/internal/transport/videochannel/transport_unit_test.go index df78313..25169fc 100644 --- a/internal/transport/videochannel/transport_unit_test.go +++ b/internal/transport/videochannel/transport_unit_test.go @@ -252,7 +252,7 @@ func TestOutboundPriorityRenderAndClosedEnqueue(t *testing.T) { // per-attempt ack budget covers a full FPS-paced round trip through every // fragment. Without this, multi-fragment payloads trigger premature // retransmits that pile fragments into the outbound channel and starve -// the ffmpeg encoder until it is killed. +// the encoder until it is killed. func TestPerAttemptAckTimeoutScalesWithFragments(t *testing.T) { // Tiny payload: floor at defaultAckTimeout. if got := perAttemptAckTimeout(1, 25); got != defaultAckTimeout {