From bf454145ba84525729b562222a968cef08042d5e Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Thu, 30 Apr 2026 08:14:05 +0300 Subject: [PATCH] feat(lcm): implement strict lifecycle management for both encoders and decoders --- internal/transport/videochannel/transport.go | 156 +++++++++++++------ 1 file changed, 112 insertions(+), 44 deletions(-) diff --git a/internal/transport/videochannel/transport.go b/internal/transport/videochannel/transport.go index c5a7ba0..dcafa0b 100644 --- a/internal/transport/videochannel/transport.go +++ b/internal/transport/videochannel/transport.go @@ -38,30 +38,33 @@ var ( ) type streamTransport struct { - stream carrier.VideoTrack - track *webrtc.TrackLocalStaticSample - codec codecSpec - encoder *ffmpegEncoder - onData func([]byte) - outbound chan []byte - outboundAck chan []byte - closeCh chan struct{} - writerDone chan struct{} - nextSeq atomic.Uint32 - closed atomic.Bool - writerUp atomic.Bool - sendMu sync.Mutex - startWriter sync.Once - ackMu sync.Mutex - ackWaiters map[uint32]chan uint32 - recvMu sync.Mutex - inbound map[uint32]*inboundMessage - delivered map[uint32]uint32 - videoW int - videoH int - videoFPS int - videoBitrate string - videoHW string + stream carrier.VideoTrack + track *webrtc.TrackLocalStaticSample + codec codecSpec + encoder *ffmpegEncoder + encoderMu sync.Mutex + decoder *ffmpegDecoder + decoderMu sync.Mutex + onData func([]byte) + outbound chan []byte + outboundAck chan []byte + closeCh chan struct{} + writerDone chan struct{} + nextSeq atomic.Uint32 + closed atomic.Bool + writerUp atomic.Bool + sendMu sync.Mutex + startWriter sync.Once + ackMu sync.Mutex + ackWaiters map[uint32]chan uint32 + recvMu sync.Mutex + inbound map[uint32]*inboundMessage + delivered map[uint32]uint32 + videoW int + videoH int + videoFPS int + videoBitrate string + videoHW string videoQRSize int videoQRRecovery string videoCodec string @@ -115,22 +118,22 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) } tr := &streamTransport{ - stream: stream, - track: track, - codec: codec, - onData: cfg.OnData, - outbound: make(chan []byte, 256), - outboundAck: make(chan []byte, 64), - closeCh: make(chan struct{}), - writerDone: make(chan struct{}), - ackWaiters: make(map[uint32]chan uint32), - inbound: make(map[uint32]*inboundMessage), - delivered: make(map[uint32]uint32), - videoW: cfg.VideoWidth, - videoH: cfg.VideoHeight, - videoFPS: cfg.VideoFPS, - videoBitrate: cfg.VideoBitrate, - videoHW: cfg.VideoHW, + stream: stream, + track: track, + codec: codec, + onData: cfg.OnData, + outbound: make(chan []byte, 256), + outboundAck: make(chan []byte, 64), + closeCh: make(chan struct{}), + writerDone: make(chan struct{}), + ackWaiters: make(map[uint32]chan uint32), + inbound: make(map[uint32]*inboundMessage), + delivered: make(map[uint32]uint32), + videoW: cfg.VideoWidth, + videoH: cfg.VideoHeight, + videoFPS: cfg.VideoFPS, + videoBitrate: cfg.VideoBitrate, + videoHW: cfg.VideoHW, videoQRSize: qrSize, videoQRRecovery: cfg.VideoQRRecovery, videoCodec: cfg.VideoCodec, @@ -161,7 +164,18 @@ func (p *streamTransport) Connect(ctx context.Context) error { return err } + p.encoderMu.Lock() + if p.closed.Load() { + p.encoderMu.Unlock() + _ = encoder.Close() + return ErrTransportClosed + } + if p.encoder != nil { + _ = p.encoder.Close() + } p.encoder = encoder + p.encoderMu.Unlock() + p.startWriter.Do(func() { p.writerUp.Store(true) go p.writerLoop() @@ -222,9 +236,19 @@ func (p *streamTransport) Send(data []byte) error { func (p *streamTransport) Close() error { if p.closed.CompareAndSwap(false, true) { close(p.closeCh) + + p.encoderMu.Lock() if p.encoder != nil { _ = p.encoder.Close() } + p.encoderMu.Unlock() + + p.decoderMu.Lock() + if p.decoder != nil { + _ = p.decoder.Close() + } + p.decoderMu.Unlock() + if p.writerUp.Load() { <-p.writerDone } @@ -275,6 +299,8 @@ func (p *streamTransport) Features() transport.Features { func (p *streamTransport) writerLoop() { defer close(p.writerDone) defer func() { + p.encoderMu.Lock() + defer p.encoderMu.Unlock() if p.encoder != nil { _ = p.encoder.Close() } @@ -301,7 +327,15 @@ func (p *streamTransport) writerLoop() { continue } - sample, err := p.encoder.EncodeFrame(rawFrame) + p.encoderMu.Lock() + enc := p.encoder + p.encoderMu.Unlock() + + if enc == nil { + continue + } + + sample, err := enc.EncodeFrame(rawFrame) if err != nil { logger.Warnf("videochannel encoder error: %v", err) continue @@ -367,12 +401,38 @@ func (p *streamTransport) handleRemoteTrack(track *webrtc.TrackRemote, _ *webrtc return } + p.decoderMu.Lock() + if p.closed.Load() { + p.decoderMu.Unlock() + _ = decoder.Close() + return + } + if p.decoder != nil { + _ = p.decoder.Close() + } + p.decoder = decoder + p.decoderMu.Unlock() + go func() { - defer func() { _ = decoder.Close() }() + defer func() { + p.decoderMu.Lock() + if p.decoder == decoder { + p.decoder = nil + } + p.decoderMu.Unlock() + _ = decoder.Close() + }() + for { + select { + case <-p.closeCh: + return + default: + } + frame, err := decoder.PopFrame() if err != nil { - if !errors.Is(err, ErrTransportClosed) { + if !errors.Is(err, ErrTransportClosed) && !p.closed.Load() { logger.Warnf("videochannel decoder pop error: %v", err) } return @@ -384,6 +444,12 @@ func (p *streamTransport) handleRemoteTrack(track *webrtc.TrackRemote, _ *webrtc go func() { sb := samplebuilder.New(sampleBuilderMaxLate, codec.depacketizer(), track.Codec().ClockRate) for { + select { + case <-p.closeCh: + return + default: + } + packet, _, err := track.ReadRTP() if err != nil { sb.Flush() @@ -393,7 +459,9 @@ func (p *streamTransport) handleRemoteTrack(track *webrtc.TrackRemote, _ *webrtc sb.Push(packet) for sample := sb.Pop(); sample != nil; sample = sb.Pop() { if err := decoder.PushSample(sample.Data); err != nil { - logger.Warnf("videochannel decoder push error: %v", err) + if !p.closed.Load() { + logger.Warnf("videochannel decoder push error: %v", err) + } return } }