feat(videochannel): add per-fragment ack tracking

This commit is contained in:
zarazaex69
2026-05-17 05:45:43 +03:00
parent 33cccbc906
commit 9a2bbfd44e
5 changed files with 270 additions and 53 deletions

View File

@@ -0,0 +1,108 @@
package videochannel
import "sync"
// fragAckTracker tracks per-fragment acknowledgements for in-flight Send
// calls. Each Send registers a tracker keyed by sequence number with the
// total fragment count; the receive loop calls Mark(seq, fragIdx) when an
// ack arrives. Send polls Snapshot() to see which fragments still need
// retransmission.
//
// The split from common.AckRegistry exists because video transports are
// lossy at the fragment level (each fragment is a separate VP8-encoded
// video frame that may be corrupted past QR/tile decode recovery). Whole-
// message ack semantics forced a full retransmit on any single-fragment
// loss, which under load piled fragments into the outbound channel and
// eventually killed the encoder. Per-fragment ack lets the sender retry
// only what was actually lost.
type fragAckTracker struct {
mu sync.Mutex
pending map[uint32]*fragWaiter
}
type fragWaiter struct {
mu sync.Mutex
crc uint32
total int
acked []bool
remaining int
notify chan struct{}
}
func newFragAckTracker() *fragAckTracker {
return &fragAckTracker{pending: make(map[uint32]*fragWaiter)}
}
// Register installs a waiter for (seq, crc) covering total fragments and
// returns it. The caller must drop it via Unregister.
func (t *fragAckTracker) Register(seq, crc uint32, total int) *fragWaiter {
w := &fragWaiter{
crc: crc,
total: total,
acked: make([]bool, total),
remaining: total,
notify: make(chan struct{}, 1),
}
t.mu.Lock()
t.pending[seq] = w
t.mu.Unlock()
return w
}
// Unregister drops the waiter for seq.
func (t *fragAckTracker) Unregister(seq uint32) {
t.mu.Lock()
delete(t.pending, seq)
t.mu.Unlock()
}
// Mark records that fragIdx of seq is acknowledged. crc must match the
// waiter's crc, otherwise the ack is ignored (it is from an older message
// whose seq was reused). Returns true iff this call actually flipped a
// previously-unacked fragment.
func (t *fragAckTracker) Mark(seq, crc uint32, fragIdx int) bool {
t.mu.Lock()
w, ok := t.pending[seq]
t.mu.Unlock()
if !ok {
return false
}
w.mu.Lock()
if w.crc != crc || fragIdx < 0 || fragIdx >= w.total || w.acked[fragIdx] {
w.mu.Unlock()
return false
}
w.acked[fragIdx] = true
w.remaining--
w.mu.Unlock()
select {
case w.notify <- struct{}{}:
default:
}
return true
}
// Pending returns the indexes of fragments still unacked.
func (w *fragWaiter) Pending() []int {
w.mu.Lock()
defer w.mu.Unlock()
out := make([]int, 0, w.remaining)
for i, ok := range w.acked {
if !ok {
out = append(out, i)
}
}
return out
}
// Done reports whether every fragment has been acked.
func (w *fragWaiter) Done() bool {
w.mu.Lock()
defer w.mu.Unlock()
return w.remaining == 0
}
// Notify returns the channel that ticks on every Mark.
func (w *fragWaiter) Notify() <-chan struct{} {
return w.notify
}

View File

@@ -7,21 +7,26 @@ import (
const (
protocolMagic uint32 = 0x4f565632 // OVV2
protocolVersion byte = 2
protocolVersion byte = 3
frameTypeData byte = 1
frameTypeAck byte = 2
frameRoleAny byte = 0
frameRoleServer byte = 1
frameRoleClient byte = 2
// v3 ack frames carry fragIdx so each fragment of a multi-fragment
// payload can be acknowledged independently. Senders retransmit only
// the fragments still unacked, which restores reliability across a
// lossy QR/tile-over-VP8 link without depending on ECC settings.
frameBindingOff = 7
frameSeqOff = 11
frameCRCOff = 15
frameAckLen = 19
frameTotalLenOff = 19
frameFragIdxOff = 23
frameFragTotalOff = 25
frameDataHdrLen = 27
frameAckFragOff = 19
frameAckLen = 21
frameTotalLenOff = 21
frameFragIdxOff = 25
frameFragTotalOff = 27
frameDataHdrLen = 29
)
var (
@@ -51,6 +56,7 @@ type transportFrame struct {
payload []byte
}
func encodeDataFrameForBinding(
role byte,
binding uint32,
@@ -65,7 +71,7 @@ func encodeDataFrameForBinding(
out[6] = role
binary.BigEndian.PutUint32(out[frameBindingOff:frameSeqOff], binding)
binary.BigEndian.PutUint32(out[frameSeqOff:frameCRCOff], seq)
binary.BigEndian.PutUint32(out[frameCRCOff:frameAckLen], crc)
binary.BigEndian.PutUint32(out[frameCRCOff:frameAckFragOff], crc)
binary.BigEndian.PutUint32(out[frameTotalLenOff:frameFragIdxOff], uint32(totalLen)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
binary.BigEndian.PutUint16(out[frameFragIdxOff:frameFragTotalOff], uint16(fragIdx)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
binary.BigEndian.PutUint16(out[frameFragTotalOff:frameDataHdrLen], uint16(fragTotal)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
@@ -73,11 +79,11 @@ func encodeDataFrameForBinding(
return out
}
func encodeAckFrame(seq, crc uint32) []byte {
return encodeAckFrameForBinding(frameRoleAny, 0, seq, crc)
func encodeAckFrame(seq, crc uint32, fragIdx uint16) []byte {
return encodeAckFrameForBinding(frameRoleAny, 0, seq, crc, fragIdx)
}
func encodeAckFrameForBinding(role byte, binding, seq, crc uint32) []byte {
func encodeAckFrameForBinding(role byte, binding, seq, crc uint32, fragIdx uint16) []byte {
out := make([]byte, frameAckLen)
binary.BigEndian.PutUint32(out[0:4], protocolMagic)
out[4] = protocolVersion
@@ -85,7 +91,8 @@ func encodeAckFrameForBinding(role byte, binding, seq, crc uint32) []byte {
out[6] = role
binary.BigEndian.PutUint32(out[frameBindingOff:frameSeqOff], binding)
binary.BigEndian.PutUint32(out[frameSeqOff:frameCRCOff], seq)
binary.BigEndian.PutUint32(out[frameCRCOff:frameAckLen], crc)
binary.BigEndian.PutUint32(out[frameCRCOff:frameAckFragOff], crc)
binary.BigEndian.PutUint16(out[frameAckFragOff:frameAckLen], fragIdx)
return out
}
@@ -140,7 +147,8 @@ func decodeAckBody(frame transportFrame, data []byte) (transportFrame, error) {
return transportFrame{}, ErrAckTooShort
}
frame.seq = binary.BigEndian.Uint32(data[frameSeqOff:frameCRCOff])
frame.crc = binary.BigEndian.Uint32(data[frameCRCOff:frameAckLen])
frame.crc = binary.BigEndian.Uint32(data[frameCRCOff:frameAckFragOff])
frame.fragIdx = binary.BigEndian.Uint16(data[frameAckFragOff:frameAckLen])
return frame, nil
}
@@ -149,7 +157,7 @@ func decodeDataBody(frame transportFrame, data []byte) (transportFrame, error) {
return transportFrame{}, ErrDataTooShort
}
frame.seq = binary.BigEndian.Uint32(data[frameSeqOff:frameCRCOff])
frame.crc = binary.BigEndian.Uint32(data[frameCRCOff:frameAckLen])
frame.crc = binary.BigEndian.Uint32(data[frameCRCOff:frameAckFragOff])
frame.totalLen = binary.BigEndian.Uint32(data[frameTotalLenOff:frameFragIdxOff])
frame.fragIdx = binary.BigEndian.Uint16(data[frameFragIdxOff:frameFragTotalOff])
frame.fragTotal = binary.BigEndian.Uint16(data[frameFragTotalOff:frameDataHdrLen])

View File

@@ -34,11 +34,11 @@ func TestDecodeTransportFrameErrorsAndAck(t *testing.T) {
}
}
ack, err := decodeTransportFrame(encodeAckFrame(7, 0x1234))
ack, err := decodeTransportFrame(encodeAckFrame(7, 0x1234, 5))
if err != nil {
t.Fatalf("decode ack error = %v", err)
}
if ack.typ != frameTypeAck || ack.seq != 7 || ack.crc != 0x1234 {
if ack.typ != frameTypeAck || ack.seq != 7 || ack.crc != 0x1234 || ack.fragIdx != 5 {
t.Fatalf("ack = %+v", ack)
}
}

View File

@@ -71,7 +71,7 @@ type streamTransport struct {
writerUp atomic.Bool
sendMu sync.Mutex
startWriter sync.Once
acks *common.AckRegistry
fragAcks *fragAckTracker
reassembler *common.Reassembler
videoW int
videoH int
@@ -157,7 +157,7 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error)
closeCh: make(chan struct{}),
writerDone: make(chan struct{}),
decoders: make(map[*ffmpegDecoder]struct{}),
acks: common.NewAckRegistry(),
fragAcks: newFragAckTracker(),
reassembler: common.NewReassembler(256),
videoW: opts.Width,
videoH: opts.Height,
@@ -218,7 +218,14 @@ func (p *streamTransport) Connect(ctx context.Context) error {
return nil
}
// Send transmits data through the transport.
// Send transmits data through the transport with per-fragment retransmits.
//
// QR/tile-encoded fragments ride lossy VP8 video frames where any single
// fragment can be corrupted past ECC recovery. With whole-message ack
// semantics a single dropped fragment forced a full retransmit; under
// load that piled fragments into the outbound channel and eventually
// killed the encoder. Here each fragment is acked independently and only
// the unacked ones are resent.
func (p *streamTransport) Send(data []byte) error {
if p.closed.Load() {
return ErrTransportClosed
@@ -230,34 +237,86 @@ func (p *streamTransport) Send(data []byte) error {
seq := p.nextSeq.Add(1)
crc := crc32.ChecksumIEEE(data)
fragments := common.FragmentPayload(data, p.videoQRSize)
waiter := p.acks.Register(seq)
defer p.acks.Unregister(seq)
waiter := p.fragAcks.Register(seq, crc, len(fragments))
defer p.fragAcks.Unregister(seq)
// Per-attempt wait covers one round trip through the FPS-paced writer
// and the peer's reassembly + ack path. Scale with fragment count so a
// large payload gets enough time to drain on the first attempt before
// we retransmit anything.
ackTimeout := perAttemptAckTimeout(len(fragments), p.videoFPS)
// Initial send: every fragment goes out once.
pending := make([]int, len(fragments))
for i := range pending {
pending[i] = i
}
for range maxSendAttempts {
for idx, fragment := range fragments {
frame := encodeDataFrameForBinding(p.localRole, p.bindingToken, seq, crc, len(data), idx, len(fragments), fragment)
for _, idx := range pending {
frame := encodeDataFrameForBinding(
p.localRole, p.bindingToken, seq, crc,
len(data), idx, len(fragments), fragments[idx])
if err := p.enqueueFrame(frame, false); err != nil {
return err
}
}
timer := time.NewTimer(defaultAckTimeout)
select {
case ackCRC := <-waiter:
timer.Stop()
if ackCRC == crc {
return nil
}
case <-timer.C:
case <-p.closeCh:
timer.Stop()
return ErrTransportClosed
if ok, err := p.awaitFragments(waiter, ackTimeout); err != nil {
return err
} else if ok {
return nil
}
pending = waiter.Pending()
if len(pending) == 0 {
return nil
}
}
return ErrAckTimeout
}
// awaitFragments blocks until the waiter is fully acked, the per-attempt
// timeout elapses, or the transport closes. Returns (done, err).
func (p *streamTransport) awaitFragments(waiter *fragWaiter, timeout time.Duration) (bool, error) {
timer := time.NewTimer(timeout)
defer timer.Stop()
for {
if waiter.Done() {
return true, nil
}
select {
case <-waiter.Notify():
// Re-check Done() at the top of the loop.
case <-timer.C:
return waiter.Done(), nil
case <-p.closeCh:
return false, ErrTransportClosed
}
}
}
// perAttemptAckTimeout returns how long to wait for acks of a multi-fragment
// payload before retransmitting unacked fragments. Floor at defaultAckTimeout
// for tiny payloads; otherwise scale linearly with fragment count to cover
// one round trip through the FPS-paced writerLoop plus reassembly on the peer
// side, with a 3× margin.
func perAttemptAckTimeout(fragments, fps int) time.Duration {
if fps <= 0 {
fps = 25
}
frameInterval := time.Second / time.Duration(fps)
estimated := time.Duration(fragments) * frameInterval * 3
if estimated < defaultAckTimeout {
return defaultAckTimeout
}
const maxAckTimeout = 30 * time.Second
if estimated > maxAckTimeout {
return maxAckTimeout
}
return estimated
}
// Close terminates the transport.
func (p *streamTransport) Close() error {
if p.closed.CompareAndSwap(false, true) {
@@ -559,7 +618,7 @@ func (p *streamTransport) handleFrame(frame []byte) {
switch decoded.typ {
case frameTypeAck:
p.resolveAck(decoded.seq, decoded.crc)
p.resolveAck(decoded.seq, decoded.crc, decoded.fragIdx)
case frameTypeData:
p.handleInboundFrame(decoded)
}
@@ -575,24 +634,30 @@ func (p *streamTransport) handleInboundFrame(frame transportFrame) {
Payload: frame.payload,
})
switch result {
case common.ResultDuplicate:
p.sendAck(frame.seq, frame.crc)
case common.ResultDelivered:
if p.onData != nil {
p.onData(data)
}
p.sendAck(frame.seq, frame.crc)
case common.ResultPartial, common.ResultIgnore:
// fragment stored or discarded; no peer response needed yet.
// All fragments of this seq are in; ack this fragment. The sender
// learns full delivery once it has accumulated acks for every
// fragment it sent.
p.sendAck(frame.seq, frame.crc, frame.fragIdx)
case common.ResultPartial, common.ResultDuplicate:
// Every fragment we successfully decoded gets acked, including
// duplicates — under retransmits the sender may have lost the
// earlier ack and is waiting on this one.
p.sendAck(frame.seq, frame.crc, frame.fragIdx)
case common.ResultIgnore:
// Malformed or out-of-range; no ack.
}
}
func (p *streamTransport) sendAck(seq, crc uint32) {
_ = p.enqueueFrame(encodeAckFrameForBinding(p.localRole, p.bindingToken, seq, crc), true)
func (p *streamTransport) sendAck(seq, crc uint32, fragIdx uint16) {
_ = p.enqueueFrame(encodeAckFrameForBinding(p.localRole, p.bindingToken, seq, crc, fragIdx), true)
}
func (p *streamTransport) resolveAck(seq, crc uint32) {
p.acks.Resolve(seq, crc)
func (p *streamTransport) resolveAck(seq, crc uint32, fragIdx uint16) {
p.fragAcks.Mark(seq, crc, int(fragIdx))
}
func localFrameRole(deviceID string) byte {

View File

@@ -10,7 +10,6 @@ import (
"github.com/openlibrecommunity/olcrtc/internal/engine"
enginebuiltin "github.com/openlibrecommunity/olcrtc/internal/engine/builtin"
"github.com/openlibrecommunity/olcrtc/internal/transport"
"github.com/openlibrecommunity/olcrtc/internal/transport/common"
"github.com/pion/webrtc/v4"
)
@@ -156,23 +155,30 @@ func TestSendAckAndClosePaths(t *testing.T) {
outboundAck: make(chan []byte, 8),
closeCh: make(chan struct{}),
writerDone: make(chan struct{}),
acks: common.NewAckRegistry(),
fragAcks: newFragAckTracker(),
videoQRSize: 4,
}
// "payload" = 7 bytes; with qrSize=4 -> two fragments. Send returns
// only after both fragIdx 0 and 1 have been acked.
done := make(chan error, 1)
payload := []byte("payload")
go func() { done <- tr.Send(payload) }()
select {
case frame := <-tr.outbound:
decoded, err := decodeTransportFrame(frame)
if err != nil {
t.Fatalf("decodeTransportFrame() error = %v", err)
wantCRC := crc32.ChecksumIEEE(payload)
seen := 0
for seen < 2 {
select {
case frame := <-tr.outbound:
decoded, err := decodeTransportFrame(frame)
if err != nil {
t.Fatalf("decodeTransportFrame() error = %v", err)
}
tr.resolveAck(decoded.seq, wantCRC, decoded.fragIdx)
seen++
case <-time.After(time.Second):
t.Fatalf("Send() did not enqueue fragment %d", seen)
}
tr.resolveAck(decoded.seq, crc32.ChecksumIEEE(payload))
case <-time.After(time.Second):
t.Fatal("Send() did not enqueue frame")
}
if err := <-done; err != nil {
@@ -240,6 +246,36 @@ func TestOutboundPriorityRenderAndClosedEnqueue(t *testing.T) {
}
}
// TestPerAttemptAckTimeoutScalesWithFragments locks in the rule that the
// per-attempt ack budget covers a full FPS-paced round trip through every
// fragment. Without this, multi-fragment payloads trigger premature
// retransmits that pile fragments into the outbound channel and starve
// the ffmpeg encoder until it is killed.
func TestPerAttemptAckTimeoutScalesWithFragments(t *testing.T) {
// Tiny payload: floor at defaultAckTimeout.
if got := perAttemptAckTimeout(1, 25); got != defaultAckTimeout {
t.Fatalf("perAttemptAckTimeout(1,25) = %v, want %v", got, defaultAckTimeout)
}
if got := perAttemptAckTimeout(2, 25); got != defaultAckTimeout {
t.Fatalf("perAttemptAckTimeout(2,25) = %v, want %v", got, defaultAckTimeout)
}
// 16 fragments @ 25 FPS: 16 * 40ms * 3 = 1920ms.
if got, want := perAttemptAckTimeout(16, 25), 1920*time.Millisecond; got != want {
t.Fatalf("perAttemptAckTimeout(16,25) = %v, want %v", got, want)
}
// Large payload caps at 30s.
if got, want := perAttemptAckTimeout(10000, 25), 30*time.Second; got != want {
t.Fatalf("perAttemptAckTimeout(10000,25) = %v, want %v", got, want)
}
// Zero/negative fps falls back to 25 FPS default.
if got := perAttemptAckTimeout(1, 0); got != defaultAckTimeout {
t.Fatalf("perAttemptAckTimeout(1,0) = %v, want %v", got, defaultAckTimeout)
}
}
func TestNextOutboundFrameStopsWhenClosed(t *testing.T) {
tr := &streamTransport{
outbound: make(chan []byte, 1),