diff --git a/go.mod b/go.mod index b8cc304..0b0391f 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,9 @@ module github.com/openlibrecommunity/olcrtc -go 1.26 +go 1.26.3 require ( + codeberg.org/rape4me/kc v0.0.0-20260526155601-5f6fc8dfaaa1 github.com/google/uuid v1.6.0 github.com/gorilla/websocket v1.5.4-0.20250319132907-e064f32e3674 github.com/livekit/protocol v1.46.0 @@ -10,6 +11,7 @@ require ( github.com/magefile/mage v1.17.2 github.com/pion/interceptor v0.1.45 github.com/pion/logging v0.2.4 + github.com/pion/rtcp v1.2.16 github.com/pion/rtp v1.10.2 github.com/pion/webrtc/v4 v4.2.13 github.com/xtaci/kcp-go/v5 v5.6.72 @@ -59,7 +61,6 @@ require ( github.com/pion/ice/v4 v4.2.5 // indirect github.com/pion/mdns/v2 v2.1.0 // indirect github.com/pion/randutil v0.1.0 // indirect - github.com/pion/rtcp v1.2.16 // indirect github.com/pion/sctp v1.10.0 // indirect github.com/pion/sdp/v3 v3.0.18 // indirect github.com/pion/srtp/v3 v3.0.10 // indirect diff --git a/go.sum b/go.sum index 2d39532..5403756 100644 --- a/go.sum +++ b/go.sum @@ -7,6 +7,8 @@ buf.build/go/protoyaml v0.7.0/go.mod h1:+a0cavd0uMvirb87xdu2ZMMmjlIQoiH/N2Ich5MG cel.dev/expr v0.25.2 h1:K6j46C81hXtZQfuX60cVWQFBJahKSE2gfRbNuvr5bFs= cel.dev/expr v0.25.2/go.mod h1:hrXvqGP6G6gyx8UAHSHJ5RGk//1Oj5nXQ2NI02Nrsg4= cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +codeberg.org/rape4me/kc v0.0.0-20260526155601-5f6fc8dfaaa1 h1:OwaN5gXJoYSgdT8+LJ/1oHf+MqI+wXIizC4yIRA9UYU= +codeberg.org/rape4me/kc v0.0.0-20260526155601-5f6fc8dfaaa1/go.mod h1:ooInikVAZhJE+m+gHIekq52ZFkUPcCZAvY4u/m2M/V0= dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk= dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= diff --git a/internal/transport/videochannel/gocodec.go b/internal/transport/videochannel/gocodec.go new file mode 100644 index 0000000..144ee73 --- /dev/null +++ b/internal/transport/videochannel/gocodec.go @@ -0,0 +1,105 @@ +package videochannel + +import ( + "fmt" + "sync" + "sync/atomic" + + "codeberg.org/rape4me/kc/vp8" +) + +// goEncoder is a pure Go VP8 encoder replacing ffmpegEncoder. +type goEncoder struct { + enc *vp8.Encoder + width int + height int + frameSize int + closed atomic.Bool + mu sync.Mutex +} + +func newGoEncoder(width, height, _ int) *goEncoder { + return &goEncoder{ + enc: vp8.NewEncoder(width, height, 10), + width: width, + height: height, + frameSize: width * height, + } +} + +func (e *goEncoder) EncodeFrame(frame []byte) ([]byte, error) { + if e.closed.Load() { + return nil, ErrTransportClosed + } + if len(frame) != e.frameSize { + return nil, fmt.Errorf("%w: got %d expected %d", ErrUnexpectedFrameSize, len(frame), e.frameSize) + } + e.mu.Lock() + defer e.mu.Unlock() + return e.enc.Encode(frame) +} + +func (e *goEncoder) Close() error { + e.closed.Store(true) + return nil +} + +// goDecoder is a pure Go VP8 decoder replacing ffmpegDecoder. +type goDecoder struct { + dec *vp8.Decoder + width int + height int + frameSize int + frames chan []byte + closed atomic.Bool + closeOnce sync.Once + closeCh chan struct{} +} + +func newGoDecoder(width, height int) *goDecoder { + return &goDecoder{ + dec: vp8.NewDecoder(), + width: width, + height: height, + frameSize: width * height, + frames: make(chan []byte, 32), + closeCh: make(chan struct{}), + } +} + +func (d *goDecoder) PushSample(sample []byte) error { + if d.closed.Load() { + return ErrTransportClosed + } + frame, err := d.dec.Decode(sample) + if err != nil { + return fmt.Errorf("vp8 decode: %w", err) + } + gray := frame.Grayscale() + select { + case d.frames <- gray: + case <-d.closeCh: + return ErrTransportClosed + } + return nil +} + +func (d *goDecoder) PopFrame() ([]byte, error) { + select { + case frame, ok := <-d.frames: + if !ok { + return nil, ErrTransportClosed + } + return frame, nil + case <-d.closeCh: + return nil, ErrTransportClosed + } +} + +func (d *goDecoder) Close() error { + d.closeOnce.Do(func() { + d.closed.Store(true) + close(d.closeCh) + }) + return nil +} diff --git a/internal/transport/videochannel/transport.go b/internal/transport/videochannel/transport.go index 3ad7854..1c3d329 100644 --- a/internal/transport/videochannel/transport.go +++ b/internal/transport/videochannel/transport.go @@ -58,10 +58,10 @@ type streamTransport struct { stream videoSession track *webrtc.TrackLocalStaticSample codec codecSpec - encoder *ffmpegEncoder + encoder *goEncoder encoderMu sync.Mutex decoderMu sync.Mutex - decoders map[*ffmpegDecoder]struct{} + decoders map[*goDecoder]struct{} onData func([]byte) outbound chan []byte outboundAck chan []byte @@ -157,7 +157,7 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) outboundAck: make(chan []byte, 64), closeCh: make(chan struct{}), writerDone: make(chan struct{}), - decoders: make(map[*ffmpegDecoder]struct{}), + decoders: make(map[*goDecoder]struct{}), fragAcks: newFragAckTracker(), reassembler: common.NewReassembler(256), videoW: opts.Width, @@ -189,10 +189,7 @@ func (p *streamTransport) Connect(ctx context.Context) error { connectCtx, cancel := context.WithTimeout(ctx, defaultConnectTimeout) defer cancel() - encoder, err := newFFmpegEncoder(ctx, p.codec, p.videoW, p.videoH, p.videoFPS, p.videoBitrate, p.videoHW) - if err != nil { - return fmt.Errorf("new encoder: %w", err) - } + encoder := newGoEncoder(p.videoW, p.videoH, p.videoFPS) if err := p.stream.Connect(connectCtx); err != nil { _ = encoder.Close() @@ -390,7 +387,7 @@ func (p *streamTransport) Features() transport.Features { } } -func (p *streamTransport) writeIdleFrame(enc *ffmpegEncoder, frameDuration time.Duration) { +func (p *streamTransport) writeIdleFrame(enc *goEncoder, frameDuration time.Duration) { p.idleFrameMu.Lock() cached := p.idleFrame p.idleFrameMu.Unlock() @@ -415,7 +412,7 @@ func (p *streamTransport) writeIdleFrame(enc *ffmpegEncoder, frameDuration time. _ = p.track.WriteSample(media.Sample{Data: cached, Duration: frameDuration}) } -func (p *streamTransport) writePayloadFrame(enc *ffmpegEncoder, payload []byte, frameDuration time.Duration) { +func (p *streamTransport) writePayloadFrame(enc *goEncoder, payload []byte, frameDuration time.Duration) { rawFrame, err := p.renderFrame(payload) if err != nil { logger.Debugf("videochannel render error: %v", err) @@ -521,7 +518,7 @@ func (p *streamTransport) enqueueFrame(frame []byte, priority bool) error { } } -func (p *streamTransport) popDecoderFrames(decoder *ffmpegDecoder) { +func (p *streamTransport) popDecoderFrames(decoder *goDecoder) { defer func() { p.decoderMu.Lock() if p.decoders != nil { @@ -549,7 +546,7 @@ func (p *streamTransport) popDecoderFrames(decoder *ffmpegDecoder) { } } -func (p *streamTransport) readDecoderInput(track *webrtc.TrackRemote, decoder *ffmpegDecoder, codec codecSpec) { +func (p *streamTransport) readDecoderInput(track *webrtc.TrackRemote, decoder *goDecoder, codec codecSpec) { sb := samplebuilder.New(sampleBuilderMaxLate, codec.depacketizer(), track.Codec().ClockRate) for { select { @@ -583,11 +580,7 @@ func (p *streamTransport) handleRemoteTrack(track *webrtc.TrackRemote, _ *webrtc return } - decoder, err := newFFmpegDecoder(p.runCtx, codec, p.videoW, p.videoH, p.videoFPS, p.videoHW) - if err != nil { - logger.Warnf("videochannel decoder init failed: %v", err) - return - } + decoder := newGoDecoder(p.videoW, p.videoH) p.decoderMu.Lock() if p.closed.Load() || p.decoders == nil {