feat(peer): Refactor send queue processing with worker pool and monitoring

This commit is contained in:
zarazaex69
2026-04-09 20:45:57 +03:00
parent 90ffe72e10
commit 36009f3593

View File

@@ -116,11 +116,22 @@ func (p *Peer) Connect(ctx context.Context) error {
dcReady := make(chan struct{})
p.dc.OnOpen(func() {
log.Println("DataChannel opened")
numWorkers := 4
for i := 0; i < numWorkers; i++ {
p.wg.Add(1)
go func(workerID int) {
defer p.wg.Done()
p.processSendQueue(workerID)
}(i)
}
p.wg.Add(1)
go func() {
defer p.wg.Done()
p.processSendQueue()
p.monitorQueue()
}()
close(dcReady)
})
@@ -208,16 +219,12 @@ func (p *Peer) Send(data []byte) error {
return fmt.Errorf("send queue closed")
}
queueLen := len(p.sendQueue)
if queueLen > 500 {
log.Printf("[SEND_QUEUE] Queue length: %d (high!)", queueLen)
}
select {
case p.sendQueue <- data:
return nil
case <-time.After(100 * time.Millisecond):
log.Printf("[SEND_QUEUE] Queue timeout! Dropping packet of %d bytes (queue_len=%d)", len(data), queueLen)
case <-time.After(50 * time.Millisecond):
queueLen := len(p.sendQueue)
log.Printf("[SEND_QUEUE] Timeout! queue_len=%d, dropping packet size=%d", queueLen, len(data))
return fmt.Errorf("send queue timeout")
}
}
@@ -706,8 +713,53 @@ func (p *Peer) WatchConnection(ctx context.Context) {
}
}
func (p *Peer) processSendQueue() {
ticker := time.NewTicker(2 * time.Second)
func (p *Peer) processSendQueue(workerID int) {
log.Printf("[WORKER-%d] Started", workerID)
defer log.Printf("[WORKER-%d] Stopped", workerID)
for {
select {
case data := <-p.sendQueue:
if p.dc == nil || p.dc.ReadyState() != webrtc.DataChannelStateOpen {
continue
}
start := time.Now()
for p.dc.BufferedAmount() > 128*1024 {
time.Sleep(5 * time.Millisecond)
if time.Since(start) > 5*time.Second {
log.Printf("[WORKER-%d] Buffer wait timeout, dropping packet size=%d", workerID, len(data))
break
}
}
if time.Since(start) > 5*time.Second {
continue
}
sendStart := time.Now()
if err := p.dc.Send(data); err != nil {
log.Printf("[WORKER-%d] Send error: %v", workerID, err)
} else {
elapsed := time.Since(sendStart)
if elapsed > 100*time.Millisecond {
log.Printf("[WORKER-%d] Sent %d bytes in %v (buffered: %d)",
workerID, len(data), elapsed, p.dc.BufferedAmount())
} else {
logger.Verbose("[WORKER-%d] Sent %d bytes (buffered: %d)",
workerID, len(data), p.dc.BufferedAmount())
}
}
case <-p.closeCh:
return
}
}
}
func (p *Peer) monitorQueue() {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
@@ -718,57 +770,9 @@ func (p *Peer) processSendQueue() {
if p.dc != nil {
buffered = p.dc.BufferedAmount()
}
if queueLen > 100 || buffered > 100*1024 {
if queueLen > 100 || buffered > 50*1024 {
log.Printf("[QUEUE_MONITOR] queue_len=%d dc_buffered=%d", queueLen, buffered)
}
case data := <-p.sendQueue:
if p.dc == nil || p.dc.ReadyState() != webrtc.DataChannelStateOpen {
continue
}
queueStart := time.Now()
waitStart := time.Now()
maxWait := 5 * time.Second
for p.dc.BufferedAmount() > 128*1024 {
if time.Since(waitStart) > maxWait {
log.Printf("[DATACHANNEL] Wait timeout after %v, dropping packet", maxWait)
break
}
time.Sleep(5 * time.Millisecond)
}
waitTime := time.Since(waitStart)
if time.Since(waitStart) > maxWait {
continue
}
sendStart := time.Now()
sendDone := make(chan error, 1)
go func() {
sendDone <- p.dc.Send(data)
}()
select {
case err := <-sendDone:
sendTime := time.Since(sendStart)
totalTime := time.Since(queueStart)
if err != nil {
log.Printf("[DATACHANNEL] Send error: %v", err)
} else if waitTime > 50*time.Millisecond || sendTime > 50*time.Millisecond {
log.Printf("[DATACHANNEL] Sent %d bytes wait=%v send=%v total=%v buffered=%d",
len(data), waitTime, sendTime, totalTime, p.dc.BufferedAmount())
} else {
logger.Verbose("Sent %d bytes to DataChannel (buffered: %d)", len(data), p.dc.BufferedAmount())
}
case <-time.After(3 * time.Second):
log.Printf("[DATACHANNEL] Send timeout after 3s, packet size=%d", len(data))
}
case <-p.closeCh:
return
}