diff --git a/internal/server/server.go b/internal/server/server.go index 1630353..46608a4 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -200,6 +200,9 @@ func (s *Server) onData(data []byte) { } func (s *Server) run(ctx context.Context) error { + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + for { select { case <-ctx.Done(): @@ -220,7 +223,8 @@ func (s *Server) run(ctx context.Context) error { log.Println("All peers closed") return nil - default: + + case <-ticker.C: } sids := s.mux.GetStreams() diff --git a/internal/telemost/peer.go b/internal/telemost/peer.go index 17c5929..6810734 100644 --- a/internal/telemost/peer.go +++ b/internal/telemost/peer.go @@ -62,7 +62,7 @@ func NewPeer(roomURL, name string, onData func([]byte)) (*Peer, error) { reconnectCh: make(chan struct{}, 1), closeCh: make(chan struct{}), keepAliveCh: make(chan struct{}), - sendQueue: make(chan []byte, 1000), + sendQueue: make(chan []byte, 5000), }, nil } @@ -209,16 +209,16 @@ func (p *Peer) Send(data []byte) error { } queueLen := len(p.sendQueue) - if queueLen > 100 { + if queueLen > 500 { log.Printf("[SEND_QUEUE] Queue length: %d (high!)", queueLen) } select { case p.sendQueue <- data: return nil - default: - log.Printf("[SEND_QUEUE] Queue full! Dropping packet of %d bytes", len(data)) - return fmt.Errorf("send queue full") + case <-time.After(100 * time.Millisecond): + log.Printf("[SEND_QUEUE] Queue timeout! Dropping packet of %d bytes (queue_len=%d)", len(data), queueLen) + return fmt.Errorf("send queue timeout") } } @@ -718,42 +718,57 @@ func (p *Peer) processSendQueue() { if p.dc != nil { buffered = p.dc.BufferedAmount() } - if queueLen > 50 || buffered > 100*1024 { + if queueLen > 100 || buffered > 100*1024 { log.Printf("[QUEUE_MONITOR] queue_len=%d dc_buffered=%d", queueLen, buffered) } case data := <-p.sendQueue: - queueStart := time.Now() - buffered := uint64(0) - if p.dc != nil { - buffered = p.dc.BufferedAmount() + if p.dc == nil || p.dc.ReadyState() != webrtc.DataChannelStateOpen { + continue } - if buffered > 256*1024 { - log.Printf("[DATACHANNEL] Buffer full: %d bytes, waiting...", buffered) - } + queueStart := time.Now() waitStart := time.Now() - for p.dc != nil && p.dc.BufferedAmount() > 256*1024 { - time.Sleep(1 * time.Millisecond) + maxWait := 5 * time.Second + for p.dc.BufferedAmount() > 128*1024 { + if time.Since(waitStart) > maxWait { + log.Printf("[DATACHANNEL] Wait timeout after %v, dropping packet", maxWait) + break + } + time.Sleep(5 * time.Millisecond) } waitTime := time.Since(waitStart) - if p.dc != nil && p.dc.ReadyState() == webrtc.DataChannelStateOpen { - sendStart := time.Now() - if err := p.dc.Send(data); err != nil { - logger.Debug("DataChannel send error: %v", err) - } else { - sendTime := time.Since(sendStart) - totalTime := time.Since(queueStart) - if waitTime > 10*time.Millisecond || sendTime > 10*time.Millisecond { - log.Printf("[DATACHANNEL] Sent %d bytes wait=%v send=%v total=%v buffered=%d", - len(data), waitTime, sendTime, totalTime, p.dc.BufferedAmount()) - } else { - logger.Verbose("Sent %d bytes to DataChannel (buffered: %d)", len(data), p.dc.BufferedAmount()) - } - } + if time.Since(waitStart) > maxWait { + continue } + + sendStart := time.Now() + sendDone := make(chan error, 1) + + go func() { + sendDone <- p.dc.Send(data) + }() + + select { + case err := <-sendDone: + sendTime := time.Since(sendStart) + totalTime := time.Since(queueStart) + + if err != nil { + log.Printf("[DATACHANNEL] Send error: %v", err) + } else if waitTime > 50*time.Millisecond || sendTime > 50*time.Millisecond { + log.Printf("[DATACHANNEL] Sent %d bytes wait=%v send=%v total=%v buffered=%d", + len(data), waitTime, sendTime, totalTime, p.dc.BufferedAmount()) + } else { + logger.Verbose("Sent %d bytes to DataChannel (buffered: %d)", len(data), p.dc.BufferedAmount()) + } + + case <-time.After(3 * time.Second): + log.Printf("[DATACHANNEL] Send timeout after 3s, packet size=%d", len(data)) + } + case <-p.closeCh: return }