From ec26cce3ddd361f17c72bc47ca0d5e4b7a743597 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Thu, 9 Apr 2026 20:49:23 +0300 Subject: [PATCH] feat(client,server,peer): Add backpressure handling and optimize buffer thresholds --- internal/client/client.go | 14 ++++++++++++++ internal/mux/mux.go | 2 +- internal/server/server.go | 29 ++++++++++++++++++++++++++++- internal/telemost/peer.go | 16 ++++++++++------ 4 files changed, 53 insertions(+), 8 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index 6491506..4928817 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -76,6 +76,20 @@ func Run(ctx context.Context, roomURL, keyHex string, socksPort int, duo bool) e } c.mux = mux.New(c.clientID, func(frame []byte) error { + for { + canSend := true + for _, peer := range c.peers { + if !peer.CanSend() { + canSend = false + break + } + } + if canSend { + break + } + time.Sleep(10 * time.Millisecond) + } + encrypted, err := c.cipher.Encrypt(frame) if err != nil { return err diff --git a/internal/mux/mux.go b/internal/mux/mux.go index bb3713f..bf5d784 100644 --- a/internal/mux/mux.go +++ b/internal/mux/mux.go @@ -89,7 +89,7 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error { logger.Verbose("SendData: sid=%d, size=%d bytes", sid, len(data)) - const chunkSize = 7168 + const chunkSize = 4096 for i := 0; i < len(data); i += chunkSize { end := i + chunkSize if end > len(data) { diff --git a/internal/server/server.go b/internal/server/server.go index 46608a4..1305ffa 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -82,6 +82,20 @@ func Run(ctx context.Context, roomURL, keyHex string, duo bool, dnsServer string } s.mux = mux.New(0, func(frame []byte) error { + for { + canSend := true + for _, peer := range s.peers { + if !peer.CanSend() { + canSend = false + break + } + } + if canSend { + break + } + time.Sleep(10 * time.Millisecond) + } + encrypted, err := s.cipher.Encrypt(frame) if err != nil { return err @@ -334,12 +348,16 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) { s.connMu.Unlock() }() - buf := make([]byte, 32768) + buf := make([]byte, 16384) for { n, err := conn.Read(buf) if err != nil { return } + + for !s.canSendData() { + time.Sleep(20 * time.Millisecond) + } if err := s.mux.SendData(sid, buf[:n]); err != nil { return @@ -347,3 +365,12 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) { } }() } + +func (s *Server) canSendData() bool { + for _, peer := range s.peers { + if !peer.CanSend() { + return false + } + } + return true +} diff --git a/internal/telemost/peer.go b/internal/telemost/peer.go index ecb9cc3..bdac266 100644 --- a/internal/telemost/peer.go +++ b/internal/telemost/peer.go @@ -726,15 +726,15 @@ func (p *Peer) processSendQueue(workerID int) { start := time.Now() - for p.dc.BufferedAmount() > 128*1024 { - time.Sleep(5 * time.Millisecond) - if time.Since(start) > 5*time.Second { + for p.dc.BufferedAmount() > 64*1024 { + time.Sleep(10 * time.Millisecond) + if time.Since(start) > 3*time.Second { log.Printf("[WORKER-%d] Buffer wait timeout, dropping packet size=%d", workerID, len(data)) break } } - if time.Since(start) > 5*time.Second { + if time.Since(start) > 3*time.Second { continue } @@ -743,7 +743,7 @@ func (p *Peer) processSendQueue(workerID int) { log.Printf("[WORKER-%d] Send error: %v", workerID, err) } else { elapsed := time.Since(sendStart) - if elapsed > 100*time.Millisecond { + if elapsed > 50*time.Millisecond { log.Printf("[WORKER-%d] Sent %d bytes in %v (buffered: %d)", workerID, len(data), elapsed, p.dc.BufferedAmount()) } else { @@ -770,7 +770,7 @@ func (p *Peer) monitorQueue() { if p.dc != nil { buffered = p.dc.BufferedAmount() } - if queueLen > 100 || buffered > 50*1024 { + if queueLen > 500 || buffered > 50*1024 { log.Printf("[QUEUE_MONITOR] queue_len=%d dc_buffered=%d", queueLen, buffered) } case <-p.closeCh: @@ -778,3 +778,7 @@ func (p *Peer) monitorQueue() { } } } + +func (p *Peer) CanSend() bool { + return len(p.sendQueue) < 3000 +}