feat(server,peer): Improve queue and buffer management with timeouts

This commit is contained in:
zarazaex69
2026-04-09 20:40:02 +03:00
parent 6d2e6bbb6e
commit 90ffe72e10
2 changed files with 49 additions and 30 deletions

View File

@@ -200,6 +200,9 @@ func (s *Server) onData(data []byte) {
}
func (s *Server) run(ctx context.Context) error {
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
@@ -220,7 +223,8 @@ func (s *Server) run(ctx context.Context) error {
log.Println("All peers closed")
return nil
default:
case <-ticker.C:
}
sids := s.mux.GetStreams()

View File

@@ -62,7 +62,7 @@ func NewPeer(roomURL, name string, onData func([]byte)) (*Peer, error) {
reconnectCh: make(chan struct{}, 1),
closeCh: make(chan struct{}),
keepAliveCh: make(chan struct{}),
sendQueue: make(chan []byte, 1000),
sendQueue: make(chan []byte, 5000),
}, nil
}
@@ -209,16 +209,16 @@ func (p *Peer) Send(data []byte) error {
}
queueLen := len(p.sendQueue)
if queueLen > 100 {
if queueLen > 500 {
log.Printf("[SEND_QUEUE] Queue length: %d (high!)", queueLen)
}
select {
case p.sendQueue <- data:
return nil
default:
log.Printf("[SEND_QUEUE] Queue full! Dropping packet of %d bytes", len(data))
return fmt.Errorf("send queue full")
case <-time.After(100 * time.Millisecond):
log.Printf("[SEND_QUEUE] Queue timeout! Dropping packet of %d bytes (queue_len=%d)", len(data), queueLen)
return fmt.Errorf("send queue timeout")
}
}
@@ -718,42 +718,57 @@ func (p *Peer) processSendQueue() {
if p.dc != nil {
buffered = p.dc.BufferedAmount()
}
if queueLen > 50 || buffered > 100*1024 {
if queueLen > 100 || buffered > 100*1024 {
log.Printf("[QUEUE_MONITOR] queue_len=%d dc_buffered=%d", queueLen, buffered)
}
case data := <-p.sendQueue:
queueStart := time.Now()
buffered := uint64(0)
if p.dc != nil {
buffered = p.dc.BufferedAmount()
if p.dc == nil || p.dc.ReadyState() != webrtc.DataChannelStateOpen {
continue
}
if buffered > 256*1024 {
log.Printf("[DATACHANNEL] Buffer full: %d bytes, waiting...", buffered)
}
queueStart := time.Now()
waitStart := time.Now()
for p.dc != nil && p.dc.BufferedAmount() > 256*1024 {
time.Sleep(1 * time.Millisecond)
maxWait := 5 * time.Second
for p.dc.BufferedAmount() > 128*1024 {
if time.Since(waitStart) > maxWait {
log.Printf("[DATACHANNEL] Wait timeout after %v, dropping packet", maxWait)
break
}
time.Sleep(5 * time.Millisecond)
}
waitTime := time.Since(waitStart)
if p.dc != nil && p.dc.ReadyState() == webrtc.DataChannelStateOpen {
sendStart := time.Now()
if err := p.dc.Send(data); err != nil {
logger.Debug("DataChannel send error: %v", err)
} else {
sendTime := time.Since(sendStart)
totalTime := time.Since(queueStart)
if waitTime > 10*time.Millisecond || sendTime > 10*time.Millisecond {
log.Printf("[DATACHANNEL] Sent %d bytes wait=%v send=%v total=%v buffered=%d",
len(data), waitTime, sendTime, totalTime, p.dc.BufferedAmount())
} else {
logger.Verbose("Sent %d bytes to DataChannel (buffered: %d)", len(data), p.dc.BufferedAmount())
}
}
if time.Since(waitStart) > maxWait {
continue
}
sendStart := time.Now()
sendDone := make(chan error, 1)
go func() {
sendDone <- p.dc.Send(data)
}()
select {
case err := <-sendDone:
sendTime := time.Since(sendStart)
totalTime := time.Since(queueStart)
if err != nil {
log.Printf("[DATACHANNEL] Send error: %v", err)
} else if waitTime > 50*time.Millisecond || sendTime > 50*time.Millisecond {
log.Printf("[DATACHANNEL] Sent %d bytes wait=%v send=%v total=%v buffered=%d",
len(data), waitTime, sendTime, totalTime, p.dc.BufferedAmount())
} else {
logger.Verbose("Sent %d bytes to DataChannel (buffered: %d)", len(data), p.dc.BufferedAmount())
}
case <-time.After(3 * time.Second):
log.Printf("[DATACHANNEL] Send timeout after 3s, packet size=%d", len(data))
}
case <-p.closeCh:
return
}