From 9a2bbfd44ec8b6ba9e2c00bd11087cadc594cbee Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Sun, 17 May 2026 05:45:43 +0300 Subject: [PATCH] feat(videochannel): add per-fragment ack tracking --- internal/transport/videochannel/fragack.go | 108 ++++++++++++++++ internal/transport/videochannel/frame.go | 34 +++-- .../videochannel/frame_extra_test.go | 4 +- internal/transport/videochannel/transport.go | 121 ++++++++++++++---- .../videochannel/transport_unit_test.go | 56 ++++++-- 5 files changed, 270 insertions(+), 53 deletions(-) create mode 100644 internal/transport/videochannel/fragack.go diff --git a/internal/transport/videochannel/fragack.go b/internal/transport/videochannel/fragack.go new file mode 100644 index 0000000..bc9629c --- /dev/null +++ b/internal/transport/videochannel/fragack.go @@ -0,0 +1,108 @@ +package videochannel + +import "sync" + +// fragAckTracker tracks per-fragment acknowledgements for in-flight Send +// calls. Each Send registers a tracker keyed by sequence number with the +// total fragment count; the receive loop calls Mark(seq, fragIdx) when an +// ack arrives. Send polls Snapshot() to see which fragments still need +// retransmission. +// +// The split from common.AckRegistry exists because video transports are +// lossy at the fragment level (each fragment is a separate VP8-encoded +// video frame that may be corrupted past QR/tile decode recovery). Whole- +// message ack semantics forced a full retransmit on any single-fragment +// loss, which under load piled fragments into the outbound channel and +// eventually killed the encoder. Per-fragment ack lets the sender retry +// only what was actually lost. +type fragAckTracker struct { + mu sync.Mutex + pending map[uint32]*fragWaiter +} + +type fragWaiter struct { + mu sync.Mutex + crc uint32 + total int + acked []bool + remaining int + notify chan struct{} +} + +func newFragAckTracker() *fragAckTracker { + return &fragAckTracker{pending: make(map[uint32]*fragWaiter)} +} + +// Register installs a waiter for (seq, crc) covering total fragments and +// returns it. The caller must drop it via Unregister. +func (t *fragAckTracker) Register(seq, crc uint32, total int) *fragWaiter { + w := &fragWaiter{ + crc: crc, + total: total, + acked: make([]bool, total), + remaining: total, + notify: make(chan struct{}, 1), + } + t.mu.Lock() + t.pending[seq] = w + t.mu.Unlock() + return w +} + +// Unregister drops the waiter for seq. +func (t *fragAckTracker) Unregister(seq uint32) { + t.mu.Lock() + delete(t.pending, seq) + t.mu.Unlock() +} + +// Mark records that fragIdx of seq is acknowledged. crc must match the +// waiter's crc, otherwise the ack is ignored (it is from an older message +// whose seq was reused). Returns true iff this call actually flipped a +// previously-unacked fragment. +func (t *fragAckTracker) Mark(seq, crc uint32, fragIdx int) bool { + t.mu.Lock() + w, ok := t.pending[seq] + t.mu.Unlock() + if !ok { + return false + } + w.mu.Lock() + if w.crc != crc || fragIdx < 0 || fragIdx >= w.total || w.acked[fragIdx] { + w.mu.Unlock() + return false + } + w.acked[fragIdx] = true + w.remaining-- + w.mu.Unlock() + select { + case w.notify <- struct{}{}: + default: + } + return true +} + +// Pending returns the indexes of fragments still unacked. +func (w *fragWaiter) Pending() []int { + w.mu.Lock() + defer w.mu.Unlock() + out := make([]int, 0, w.remaining) + for i, ok := range w.acked { + if !ok { + out = append(out, i) + } + } + return out +} + +// Done reports whether every fragment has been acked. +func (w *fragWaiter) Done() bool { + w.mu.Lock() + defer w.mu.Unlock() + return w.remaining == 0 +} + +// Notify returns the channel that ticks on every Mark. +func (w *fragWaiter) Notify() <-chan struct{} { + return w.notify +} diff --git a/internal/transport/videochannel/frame.go b/internal/transport/videochannel/frame.go index 6e28726..f60fe9e 100644 --- a/internal/transport/videochannel/frame.go +++ b/internal/transport/videochannel/frame.go @@ -7,21 +7,26 @@ import ( const ( protocolMagic uint32 = 0x4f565632 // OVV2 - protocolVersion byte = 2 + protocolVersion byte = 3 frameTypeData byte = 1 frameTypeAck byte = 2 frameRoleAny byte = 0 frameRoleServer byte = 1 frameRoleClient byte = 2 + // v3 ack frames carry fragIdx so each fragment of a multi-fragment + // payload can be acknowledged independently. Senders retransmit only + // the fragments still unacked, which restores reliability across a + // lossy QR/tile-over-VP8 link without depending on ECC settings. frameBindingOff = 7 frameSeqOff = 11 frameCRCOff = 15 - frameAckLen = 19 - frameTotalLenOff = 19 - frameFragIdxOff = 23 - frameFragTotalOff = 25 - frameDataHdrLen = 27 + frameAckFragOff = 19 + frameAckLen = 21 + frameTotalLenOff = 21 + frameFragIdxOff = 25 + frameFragTotalOff = 27 + frameDataHdrLen = 29 ) var ( @@ -51,6 +56,7 @@ type transportFrame struct { payload []byte } + func encodeDataFrameForBinding( role byte, binding uint32, @@ -65,7 +71,7 @@ func encodeDataFrameForBinding( out[6] = role binary.BigEndian.PutUint32(out[frameBindingOff:frameSeqOff], binding) binary.BigEndian.PutUint32(out[frameSeqOff:frameCRCOff], seq) - binary.BigEndian.PutUint32(out[frameCRCOff:frameAckLen], crc) + binary.BigEndian.PutUint32(out[frameCRCOff:frameAckFragOff], crc) binary.BigEndian.PutUint32(out[frameTotalLenOff:frameFragIdxOff], uint32(totalLen)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic binary.BigEndian.PutUint16(out[frameFragIdxOff:frameFragTotalOff], uint16(fragIdx)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic binary.BigEndian.PutUint16(out[frameFragTotalOff:frameDataHdrLen], uint16(fragTotal)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic @@ -73,11 +79,11 @@ func encodeDataFrameForBinding( return out } -func encodeAckFrame(seq, crc uint32) []byte { - return encodeAckFrameForBinding(frameRoleAny, 0, seq, crc) +func encodeAckFrame(seq, crc uint32, fragIdx uint16) []byte { + return encodeAckFrameForBinding(frameRoleAny, 0, seq, crc, fragIdx) } -func encodeAckFrameForBinding(role byte, binding, seq, crc uint32) []byte { +func encodeAckFrameForBinding(role byte, binding, seq, crc uint32, fragIdx uint16) []byte { out := make([]byte, frameAckLen) binary.BigEndian.PutUint32(out[0:4], protocolMagic) out[4] = protocolVersion @@ -85,7 +91,8 @@ func encodeAckFrameForBinding(role byte, binding, seq, crc uint32) []byte { out[6] = role binary.BigEndian.PutUint32(out[frameBindingOff:frameSeqOff], binding) binary.BigEndian.PutUint32(out[frameSeqOff:frameCRCOff], seq) - binary.BigEndian.PutUint32(out[frameCRCOff:frameAckLen], crc) + binary.BigEndian.PutUint32(out[frameCRCOff:frameAckFragOff], crc) + binary.BigEndian.PutUint16(out[frameAckFragOff:frameAckLen], fragIdx) return out } @@ -140,7 +147,8 @@ func decodeAckBody(frame transportFrame, data []byte) (transportFrame, error) { return transportFrame{}, ErrAckTooShort } frame.seq = binary.BigEndian.Uint32(data[frameSeqOff:frameCRCOff]) - frame.crc = binary.BigEndian.Uint32(data[frameCRCOff:frameAckLen]) + frame.crc = binary.BigEndian.Uint32(data[frameCRCOff:frameAckFragOff]) + frame.fragIdx = binary.BigEndian.Uint16(data[frameAckFragOff:frameAckLen]) return frame, nil } @@ -149,7 +157,7 @@ func decodeDataBody(frame transportFrame, data []byte) (transportFrame, error) { return transportFrame{}, ErrDataTooShort } frame.seq = binary.BigEndian.Uint32(data[frameSeqOff:frameCRCOff]) - frame.crc = binary.BigEndian.Uint32(data[frameCRCOff:frameAckLen]) + frame.crc = binary.BigEndian.Uint32(data[frameCRCOff:frameAckFragOff]) frame.totalLen = binary.BigEndian.Uint32(data[frameTotalLenOff:frameFragIdxOff]) frame.fragIdx = binary.BigEndian.Uint16(data[frameFragIdxOff:frameFragTotalOff]) frame.fragTotal = binary.BigEndian.Uint16(data[frameFragTotalOff:frameDataHdrLen]) diff --git a/internal/transport/videochannel/frame_extra_test.go b/internal/transport/videochannel/frame_extra_test.go index 5df86f3..bb27231 100644 --- a/internal/transport/videochannel/frame_extra_test.go +++ b/internal/transport/videochannel/frame_extra_test.go @@ -34,11 +34,11 @@ func TestDecodeTransportFrameErrorsAndAck(t *testing.T) { } } - ack, err := decodeTransportFrame(encodeAckFrame(7, 0x1234)) + ack, err := decodeTransportFrame(encodeAckFrame(7, 0x1234, 5)) if err != nil { t.Fatalf("decode ack error = %v", err) } - if ack.typ != frameTypeAck || ack.seq != 7 || ack.crc != 0x1234 { + if ack.typ != frameTypeAck || ack.seq != 7 || ack.crc != 0x1234 || ack.fragIdx != 5 { t.Fatalf("ack = %+v", ack) } } diff --git a/internal/transport/videochannel/transport.go b/internal/transport/videochannel/transport.go index 44fbb60..08ccd79 100644 --- a/internal/transport/videochannel/transport.go +++ b/internal/transport/videochannel/transport.go @@ -71,7 +71,7 @@ type streamTransport struct { writerUp atomic.Bool sendMu sync.Mutex startWriter sync.Once - acks *common.AckRegistry + fragAcks *fragAckTracker reassembler *common.Reassembler videoW int videoH int @@ -157,7 +157,7 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) closeCh: make(chan struct{}), writerDone: make(chan struct{}), decoders: make(map[*ffmpegDecoder]struct{}), - acks: common.NewAckRegistry(), + fragAcks: newFragAckTracker(), reassembler: common.NewReassembler(256), videoW: opts.Width, videoH: opts.Height, @@ -218,7 +218,14 @@ func (p *streamTransport) Connect(ctx context.Context) error { return nil } -// Send transmits data through the transport. +// Send transmits data through the transport with per-fragment retransmits. +// +// QR/tile-encoded fragments ride lossy VP8 video frames where any single +// fragment can be corrupted past ECC recovery. With whole-message ack +// semantics a single dropped fragment forced a full retransmit; under +// load that piled fragments into the outbound channel and eventually +// killed the encoder. Here each fragment is acked independently and only +// the unacked ones are resent. func (p *streamTransport) Send(data []byte) error { if p.closed.Load() { return ErrTransportClosed @@ -230,34 +237,86 @@ func (p *streamTransport) Send(data []byte) error { seq := p.nextSeq.Add(1) crc := crc32.ChecksumIEEE(data) fragments := common.FragmentPayload(data, p.videoQRSize) - waiter := p.acks.Register(seq) - defer p.acks.Unregister(seq) + waiter := p.fragAcks.Register(seq, crc, len(fragments)) + defer p.fragAcks.Unregister(seq) + + // Per-attempt wait covers one round trip through the FPS-paced writer + // and the peer's reassembly + ack path. Scale with fragment count so a + // large payload gets enough time to drain on the first attempt before + // we retransmit anything. + ackTimeout := perAttemptAckTimeout(len(fragments), p.videoFPS) + + // Initial send: every fragment goes out once. + pending := make([]int, len(fragments)) + for i := range pending { + pending[i] = i + } for range maxSendAttempts { - for idx, fragment := range fragments { - frame := encodeDataFrameForBinding(p.localRole, p.bindingToken, seq, crc, len(data), idx, len(fragments), fragment) + for _, idx := range pending { + frame := encodeDataFrameForBinding( + p.localRole, p.bindingToken, seq, crc, + len(data), idx, len(fragments), fragments[idx]) if err := p.enqueueFrame(frame, false); err != nil { return err } } - timer := time.NewTimer(defaultAckTimeout) - select { - case ackCRC := <-waiter: - timer.Stop() - if ackCRC == crc { - return nil - } - case <-timer.C: - case <-p.closeCh: - timer.Stop() - return ErrTransportClosed + if ok, err := p.awaitFragments(waiter, ackTimeout); err != nil { + return err + } else if ok { + return nil + } + pending = waiter.Pending() + if len(pending) == 0 { + return nil } } return ErrAckTimeout } +// awaitFragments blocks until the waiter is fully acked, the per-attempt +// timeout elapses, or the transport closes. Returns (done, err). +func (p *streamTransport) awaitFragments(waiter *fragWaiter, timeout time.Duration) (bool, error) { + timer := time.NewTimer(timeout) + defer timer.Stop() + for { + if waiter.Done() { + return true, nil + } + select { + case <-waiter.Notify(): + // Re-check Done() at the top of the loop. + case <-timer.C: + return waiter.Done(), nil + case <-p.closeCh: + return false, ErrTransportClosed + } + } +} + +// perAttemptAckTimeout returns how long to wait for acks of a multi-fragment +// payload before retransmitting unacked fragments. Floor at defaultAckTimeout +// for tiny payloads; otherwise scale linearly with fragment count to cover +// one round trip through the FPS-paced writerLoop plus reassembly on the peer +// side, with a 3× margin. +func perAttemptAckTimeout(fragments, fps int) time.Duration { + if fps <= 0 { + fps = 25 + } + frameInterval := time.Second / time.Duration(fps) + estimated := time.Duration(fragments) * frameInterval * 3 + if estimated < defaultAckTimeout { + return defaultAckTimeout + } + const maxAckTimeout = 30 * time.Second + if estimated > maxAckTimeout { + return maxAckTimeout + } + return estimated +} + // Close terminates the transport. func (p *streamTransport) Close() error { if p.closed.CompareAndSwap(false, true) { @@ -559,7 +618,7 @@ func (p *streamTransport) handleFrame(frame []byte) { switch decoded.typ { case frameTypeAck: - p.resolveAck(decoded.seq, decoded.crc) + p.resolveAck(decoded.seq, decoded.crc, decoded.fragIdx) case frameTypeData: p.handleInboundFrame(decoded) } @@ -575,24 +634,30 @@ func (p *streamTransport) handleInboundFrame(frame transportFrame) { Payload: frame.payload, }) switch result { - case common.ResultDuplicate: - p.sendAck(frame.seq, frame.crc) case common.ResultDelivered: if p.onData != nil { p.onData(data) } - p.sendAck(frame.seq, frame.crc) - case common.ResultPartial, common.ResultIgnore: - // fragment stored or discarded; no peer response needed yet. + // All fragments of this seq are in; ack this fragment. The sender + // learns full delivery once it has accumulated acks for every + // fragment it sent. + p.sendAck(frame.seq, frame.crc, frame.fragIdx) + case common.ResultPartial, common.ResultDuplicate: + // Every fragment we successfully decoded gets acked, including + // duplicates — under retransmits the sender may have lost the + // earlier ack and is waiting on this one. + p.sendAck(frame.seq, frame.crc, frame.fragIdx) + case common.ResultIgnore: + // Malformed or out-of-range; no ack. } } -func (p *streamTransport) sendAck(seq, crc uint32) { - _ = p.enqueueFrame(encodeAckFrameForBinding(p.localRole, p.bindingToken, seq, crc), true) +func (p *streamTransport) sendAck(seq, crc uint32, fragIdx uint16) { + _ = p.enqueueFrame(encodeAckFrameForBinding(p.localRole, p.bindingToken, seq, crc, fragIdx), true) } -func (p *streamTransport) resolveAck(seq, crc uint32) { - p.acks.Resolve(seq, crc) +func (p *streamTransport) resolveAck(seq, crc uint32, fragIdx uint16) { + p.fragAcks.Mark(seq, crc, int(fragIdx)) } func localFrameRole(deviceID string) byte { diff --git a/internal/transport/videochannel/transport_unit_test.go b/internal/transport/videochannel/transport_unit_test.go index 623f9f9..837eafd 100644 --- a/internal/transport/videochannel/transport_unit_test.go +++ b/internal/transport/videochannel/transport_unit_test.go @@ -10,7 +10,6 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/engine" enginebuiltin "github.com/openlibrecommunity/olcrtc/internal/engine/builtin" "github.com/openlibrecommunity/olcrtc/internal/transport" - "github.com/openlibrecommunity/olcrtc/internal/transport/common" "github.com/pion/webrtc/v4" ) @@ -156,23 +155,30 @@ func TestSendAckAndClosePaths(t *testing.T) { outboundAck: make(chan []byte, 8), closeCh: make(chan struct{}), writerDone: make(chan struct{}), - acks: common.NewAckRegistry(), + fragAcks: newFragAckTracker(), videoQRSize: 4, } + // "payload" = 7 bytes; with qrSize=4 -> two fragments. Send returns + // only after both fragIdx 0 and 1 have been acked. done := make(chan error, 1) payload := []byte("payload") go func() { done <- tr.Send(payload) }() - select { - case frame := <-tr.outbound: - decoded, err := decodeTransportFrame(frame) - if err != nil { - t.Fatalf("decodeTransportFrame() error = %v", err) + wantCRC := crc32.ChecksumIEEE(payload) + seen := 0 + for seen < 2 { + select { + case frame := <-tr.outbound: + decoded, err := decodeTransportFrame(frame) + if err != nil { + t.Fatalf("decodeTransportFrame() error = %v", err) + } + tr.resolveAck(decoded.seq, wantCRC, decoded.fragIdx) + seen++ + case <-time.After(time.Second): + t.Fatalf("Send() did not enqueue fragment %d", seen) } - tr.resolveAck(decoded.seq, crc32.ChecksumIEEE(payload)) - case <-time.After(time.Second): - t.Fatal("Send() did not enqueue frame") } if err := <-done; err != nil { @@ -240,6 +246,36 @@ func TestOutboundPriorityRenderAndClosedEnqueue(t *testing.T) { } } +// TestPerAttemptAckTimeoutScalesWithFragments locks in the rule that the +// per-attempt ack budget covers a full FPS-paced round trip through every +// fragment. Without this, multi-fragment payloads trigger premature +// retransmits that pile fragments into the outbound channel and starve +// the ffmpeg encoder until it is killed. +func TestPerAttemptAckTimeoutScalesWithFragments(t *testing.T) { + // Tiny payload: floor at defaultAckTimeout. + if got := perAttemptAckTimeout(1, 25); got != defaultAckTimeout { + t.Fatalf("perAttemptAckTimeout(1,25) = %v, want %v", got, defaultAckTimeout) + } + if got := perAttemptAckTimeout(2, 25); got != defaultAckTimeout { + t.Fatalf("perAttemptAckTimeout(2,25) = %v, want %v", got, defaultAckTimeout) + } + + // 16 fragments @ 25 FPS: 16 * 40ms * 3 = 1920ms. + if got, want := perAttemptAckTimeout(16, 25), 1920*time.Millisecond; got != want { + t.Fatalf("perAttemptAckTimeout(16,25) = %v, want %v", got, want) + } + + // Large payload caps at 30s. + if got, want := perAttemptAckTimeout(10000, 25), 30*time.Second; got != want { + t.Fatalf("perAttemptAckTimeout(10000,25) = %v, want %v", got, want) + } + + // Zero/negative fps falls back to 25 FPS default. + if got := perAttemptAckTimeout(1, 0); got != defaultAckTimeout { + t.Fatalf("perAttemptAckTimeout(1,0) = %v, want %v", got, defaultAckTimeout) + } +} + func TestNextOutboundFrameStopsWhenClosed(t *testing.T) { tr := &streamTransport{ outbound: make(chan []byte, 1),