diff --git a/internal/client/client.go b/internal/client/client.go index ace4a8b..6491506 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -262,8 +262,19 @@ func (c *Client) handleSOCKS5(conn net.Conn) { reqData, _ := json.Marshal(req) sendTime := time.Now() + + queueLen := 0 + buffered := uint64(0) + for _, peer := range c.peers { + if peer != nil { + queueLen += len(peer.GetSendQueue()) + buffered += peer.GetBufferedAmount() + } + } + c.mux.SendData(sid, reqData) - log.Printf("[CLIENT] sid=%d SEND_REQUEST elapsed=%v", sid, time.Since(sendTime)) + log.Printf("[CLIENT] sid=%d SEND_REQUEST elapsed=%v queue_len=%d dc_buffered=%d", + sid, time.Since(sendTime), queueLen, buffered) dataReady := c.mux.WaitForData(sid) timeout := time.NewTimer(10 * time.Second) diff --git a/internal/telemost/peer.go b/internal/telemost/peer.go index be809d9..17c5929 100644 --- a/internal/telemost/peer.go +++ b/internal/telemost/peer.go @@ -37,6 +37,17 @@ type Peer struct { wg sync.WaitGroup } +func (p *Peer) GetSendQueue() chan []byte { + return p.sendQueue +} + +func (p *Peer) GetBufferedAmount() uint64 { + if p.dc != nil { + return p.dc.BufferedAmount() + } + return 0 +} + func NewPeer(roomURL, name string, onData func([]byte)) (*Peer, error) { conn, err := GetConnectionInfo(roomURL, name) if err != nil { @@ -696,8 +707,21 @@ func (p *Peer) WatchConnection(ctx context.Context) { } func (p *Peer) processSendQueue() { + ticker := time.NewTicker(2 * time.Second) + defer ticker.Stop() + for { select { + case <-ticker.C: + queueLen := len(p.sendQueue) + buffered := uint64(0) + if p.dc != nil { + buffered = p.dc.BufferedAmount() + } + if queueLen > 50 || buffered > 100*1024 { + log.Printf("[QUEUE_MONITOR] queue_len=%d dc_buffered=%d", queueLen, buffered) + } + case data := <-p.sendQueue: queueStart := time.Now() buffered := uint64(0)