feat(lcm): implement strict lifecycle management for both encoders and

decoders
This commit is contained in:
zarazaex69
2026-04-30 08:14:05 +03:00
parent b4408715bb
commit bf454145ba

View File

@@ -38,30 +38,33 @@ var (
)
type streamTransport struct {
stream carrier.VideoTrack
track *webrtc.TrackLocalStaticSample
codec codecSpec
encoder *ffmpegEncoder
onData func([]byte)
outbound chan []byte
outboundAck chan []byte
closeCh chan struct{}
writerDone chan struct{}
nextSeq atomic.Uint32
closed atomic.Bool
writerUp atomic.Bool
sendMu sync.Mutex
startWriter sync.Once
ackMu sync.Mutex
ackWaiters map[uint32]chan uint32
recvMu sync.Mutex
inbound map[uint32]*inboundMessage
delivered map[uint32]uint32
videoW int
videoH int
videoFPS int
videoBitrate string
videoHW string
stream carrier.VideoTrack
track *webrtc.TrackLocalStaticSample
codec codecSpec
encoder *ffmpegEncoder
encoderMu sync.Mutex
decoder *ffmpegDecoder
decoderMu sync.Mutex
onData func([]byte)
outbound chan []byte
outboundAck chan []byte
closeCh chan struct{}
writerDone chan struct{}
nextSeq atomic.Uint32
closed atomic.Bool
writerUp atomic.Bool
sendMu sync.Mutex
startWriter sync.Once
ackMu sync.Mutex
ackWaiters map[uint32]chan uint32
recvMu sync.Mutex
inbound map[uint32]*inboundMessage
delivered map[uint32]uint32
videoW int
videoH int
videoFPS int
videoBitrate string
videoHW string
videoQRSize int
videoQRRecovery string
videoCodec string
@@ -115,22 +118,22 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error)
}
tr := &streamTransport{
stream: stream,
track: track,
codec: codec,
onData: cfg.OnData,
outbound: make(chan []byte, 256),
outboundAck: make(chan []byte, 64),
closeCh: make(chan struct{}),
writerDone: make(chan struct{}),
ackWaiters: make(map[uint32]chan uint32),
inbound: make(map[uint32]*inboundMessage),
delivered: make(map[uint32]uint32),
videoW: cfg.VideoWidth,
videoH: cfg.VideoHeight,
videoFPS: cfg.VideoFPS,
videoBitrate: cfg.VideoBitrate,
videoHW: cfg.VideoHW,
stream: stream,
track: track,
codec: codec,
onData: cfg.OnData,
outbound: make(chan []byte, 256),
outboundAck: make(chan []byte, 64),
closeCh: make(chan struct{}),
writerDone: make(chan struct{}),
ackWaiters: make(map[uint32]chan uint32),
inbound: make(map[uint32]*inboundMessage),
delivered: make(map[uint32]uint32),
videoW: cfg.VideoWidth,
videoH: cfg.VideoHeight,
videoFPS: cfg.VideoFPS,
videoBitrate: cfg.VideoBitrate,
videoHW: cfg.VideoHW,
videoQRSize: qrSize,
videoQRRecovery: cfg.VideoQRRecovery,
videoCodec: cfg.VideoCodec,
@@ -161,7 +164,18 @@ func (p *streamTransport) Connect(ctx context.Context) error {
return err
}
p.encoderMu.Lock()
if p.closed.Load() {
p.encoderMu.Unlock()
_ = encoder.Close()
return ErrTransportClosed
}
if p.encoder != nil {
_ = p.encoder.Close()
}
p.encoder = encoder
p.encoderMu.Unlock()
p.startWriter.Do(func() {
p.writerUp.Store(true)
go p.writerLoop()
@@ -222,9 +236,19 @@ func (p *streamTransport) Send(data []byte) error {
func (p *streamTransport) Close() error {
if p.closed.CompareAndSwap(false, true) {
close(p.closeCh)
p.encoderMu.Lock()
if p.encoder != nil {
_ = p.encoder.Close()
}
p.encoderMu.Unlock()
p.decoderMu.Lock()
if p.decoder != nil {
_ = p.decoder.Close()
}
p.decoderMu.Unlock()
if p.writerUp.Load() {
<-p.writerDone
}
@@ -275,6 +299,8 @@ func (p *streamTransport) Features() transport.Features {
func (p *streamTransport) writerLoop() {
defer close(p.writerDone)
defer func() {
p.encoderMu.Lock()
defer p.encoderMu.Unlock()
if p.encoder != nil {
_ = p.encoder.Close()
}
@@ -301,7 +327,15 @@ func (p *streamTransport) writerLoop() {
continue
}
sample, err := p.encoder.EncodeFrame(rawFrame)
p.encoderMu.Lock()
enc := p.encoder
p.encoderMu.Unlock()
if enc == nil {
continue
}
sample, err := enc.EncodeFrame(rawFrame)
if err != nil {
logger.Warnf("videochannel encoder error: %v", err)
continue
@@ -367,12 +401,38 @@ func (p *streamTransport) handleRemoteTrack(track *webrtc.TrackRemote, _ *webrtc
return
}
p.decoderMu.Lock()
if p.closed.Load() {
p.decoderMu.Unlock()
_ = decoder.Close()
return
}
if p.decoder != nil {
_ = p.decoder.Close()
}
p.decoder = decoder
p.decoderMu.Unlock()
go func() {
defer func() { _ = decoder.Close() }()
defer func() {
p.decoderMu.Lock()
if p.decoder == decoder {
p.decoder = nil
}
p.decoderMu.Unlock()
_ = decoder.Close()
}()
for {
select {
case <-p.closeCh:
return
default:
}
frame, err := decoder.PopFrame()
if err != nil {
if !errors.Is(err, ErrTransportClosed) {
if !errors.Is(err, ErrTransportClosed) && !p.closed.Load() {
logger.Warnf("videochannel decoder pop error: %v", err)
}
return
@@ -384,6 +444,12 @@ func (p *streamTransport) handleRemoteTrack(track *webrtc.TrackRemote, _ *webrtc
go func() {
sb := samplebuilder.New(sampleBuilderMaxLate, codec.depacketizer(), track.Codec().ClockRate)
for {
select {
case <-p.closeCh:
return
default:
}
packet, _, err := track.ReadRTP()
if err != nil {
sb.Flush()
@@ -393,7 +459,9 @@ func (p *streamTransport) handleRemoteTrack(track *webrtc.TrackRemote, _ *webrtc
sb.Push(packet)
for sample := sb.Pop(); sample != nil; sample = sb.Pop() {
if err := decoder.PushSample(sample.Data); err != nil {
logger.Warnf("videochannel decoder push error: %v", err)
if !p.closed.Load() {
logger.Warnf("videochannel decoder push error: %v", err)
}
return
}
}