diff --git a/internal/server/server.go b/internal/server/server.go index e196dcb..a56f285 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -519,27 +519,18 @@ func (s *Server) dial(req ConnectRequest) (net.Conn, error) { } func (s *Server) pumpToMux(sid uint16, conn net.Conn) { - start := time.Now() - var totalBytes uint64 - var totalReads uint64 - var lastReadAt time.Time = start - var exitReason string - defer func() { s.activeClients.Add(-1) _ = s.mux.CloseStream(sid) s.connMu.Lock() delete(s.connections, sid) s.connMu.Unlock() - logger.Infof("sid=%d pumpToMux exit reason=%s bytes=%d reads=%d uptime=%v sinceLastRead=%v", - sid, exitReason, totalBytes, totalReads, - time.Since(start), time.Since(lastReadAt)) }() // Decoupling queue: Read goroutine pushes here, sender goroutine drains // to mux.SendData. Without this, slow channel back-pressure stalls the // upstream Read which can cause TCP receive window to collapse to zero - // and effectively wedge the connection (selectel stops sending and never + // and effectively wedge the connection (peer stops sending and never // resumes even though our channel is healthy). type chunk struct{ data []byte } queue := make(chan chunk, 64) @@ -557,10 +548,10 @@ func (s *Server) pumpToMux(sid uint16, conn net.Conn) { } }() - // queueHasSpace blocks until the decoupling queue has room or we're - // shutting down. We deliberately wait *before* arming the upstream - // read deadline so that channel back-pressure does not get billed to - // the upstream socket as idle time and trip a spurious i/o timeout. + // queueHasSpace blocks until the decoupling queue has room or the + // sender goroutine has exited. We wait here *before* arming the + // upstream read deadline so that channel back-pressure isn't billed + // to the socket as idle time and doesn't trip a spurious i/o timeout. queueHasSpace := func() bool { for { if len(queue) < cap(queue) { @@ -574,28 +565,6 @@ func (s *Server) pumpToMux(sid uint16, conn net.Conn) { } } - // Periodic read stats so we can see throughput per-sid in real time. - statsTicker := time.NewTicker(5 * time.Second) - defer statsTicker.Stop() - statsStop := make(chan struct{}) - var statsLastBytes uint64 - go func() { - for { - select { - case <-statsStop: - return - case <-statsTicker.C: - cur := atomic.LoadUint64(&totalBytes) - delta := cur - statsLastBytes - statsLastBytes = cur - idle := time.Since(lastReadAt) - logger.Infof("sid=%d upstream rx[5s]: bytes=%d total=%d reads=%d idle=%v queueLen=%d", - sid, delta, cur, atomic.LoadUint64(&totalReads), idle, len(queue)) - } - } - }() - defer close(statsStop) - buf := make([]byte, 16384) // Idle timeout for genuinely dead upstreams. Only armed when we are @@ -605,34 +574,23 @@ func (s *Server) pumpToMux(sid uint16, conn net.Conn) { const idleReadTimeout = 60 * time.Second for { - // Wait until the decoupling queue has space *before* reading from - // upstream. While we are blocked here we leave the read deadline - // disarmed so a slow vp8 channel cannot kill a healthy TCP socket. if !queueHasSpace() { - exitReason = "sender goroutine exited" close(queue) <-doneSender return } - // Arm the deadline only now, when we actually want bytes from the - // peer. If the peer is alive, Read returns quickly and we re-arm - // on the next loop. If the peer truly went silent, we surface - // i/o timeout after idleReadTimeout instead of hanging forever. + // Arm the deadline only when we actually want bytes from the peer. _ = conn.SetReadDeadline(time.Now().Add(idleReadTimeout)) n, err := conn.Read(buf) if err != nil { - exitReason = fmt.Sprintf("read error: %v", err) close(queue) <-doneSender return } - atomic.AddUint64(&totalBytes, uint64(n)) - atomic.AddUint64(&totalReads, 1) - lastReadAt = time.Now() - // Clear the deadline so it does not fire while we are blocked in + // Clear the deadline so it doesn't fire while we are blocked in // queueHasSpace() on the next iteration (back-pressure path). _ = conn.SetReadDeadline(time.Time{}) @@ -641,8 +599,7 @@ func (s *Server) pumpToMux(sid uint16, conn net.Conn) { copy(c, buf[:n]) // Guaranteed non-blocking thanks to queueHasSpace() above (we are - // the sole producer). Falling back to a blocking send is still - // safe but should never be needed. + // the sole producer); the blocking fallback is just defensive. select { case queue <- chunk{data: c}: default: diff --git a/internal/transport/vp8channel/transport.go b/internal/transport/vp8channel/transport.go index 2b4b053..ba6719b 100644 --- a/internal/transport/vp8channel/transport.go +++ b/internal/transport/vp8channel/transport.go @@ -5,7 +5,6 @@ import ( "encoding/binary" "errors" "fmt" - "log" "sync" "sync/atomic" "time" @@ -59,15 +58,6 @@ type streamTransport struct { startOnce sync.Once frameInterval time.Duration batchSize int - - // TX metrics - txSendCalls atomic.Uint64 // Send() invocations - txSendBlocked atomic.Uint64 // Send() that had to block on full outbound - txDataSent atomic.Uint64 // data samples written to track - txDataBytes atomic.Uint64 // total bytes of data samples - txKeepaliveTx atomic.Uint64 // keepalive samples written - txWriteErrors atomic.Uint64 // WriteSample errors - txOutboundPeak atomic.Uint64 // max observed outbound depth in window } func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) { @@ -153,19 +143,7 @@ func (p *streamTransport) Send(data []byte) error { } frame := encodeDataFrame(data) - p.txSendCalls.Add(1) - // Fast path - select { - case <-p.closeCh: - return ErrTransportClosed - case p.outbound <- frame: - return nil - default: - } - - // Slow path: outbound full, account for the stall. - p.txSendBlocked.Add(1) select { case <-p.closeCh: return ErrTransportClosed @@ -254,53 +232,15 @@ func (p *streamTransport) writerLoop() { } idleTicks := 0 - reportTicker := time.NewTicker(5 * time.Second) - defer reportTicker.Stop() - var lastCalls, lastBlocked, lastData, lastBytes, lastKa, lastErrs uint64 - reportFn := func() { - calls := p.txSendCalls.Load() - blocked := p.txSendBlocked.Load() - data := p.txDataSent.Load() - bytes := p.txDataBytes.Load() - ka := p.txKeepaliveTx.Load() - errs := p.txWriteErrors.Load() - peak := p.txOutboundPeak.Swap(0) - - dCalls := calls - lastCalls - dBlocked := blocked - lastBlocked - dData := data - lastData - dBytes := bytes - lastBytes - dKa := ka - lastKa - dErrs := errs - lastErrs - - lastCalls, lastBlocked, lastData, lastBytes, lastKa, lastErrs = - calls, blocked, data, bytes, ka, errs - - log.Printf("vp8channel tx[5s]: send=%d blocked=%d data=%d bytes=%d ka=%d werr=%d outQ=%d/%d peak=%d kbps=%.1f", - dCalls, dBlocked, dData, dBytes, dKa, dErrs, - len(p.outbound), cap(p.outbound), peak, - float64(dBytes)*8/1024/5) - } - for { select { case <-p.closeCh: - reportFn() return - case <-reportTicker.C: - reportFn() case <-ticker.C: - // Track outbound queue peak for diagnostics. - if depth := uint64(len(p.outbound)); depth > p.txOutboundPeak.Load() { - p.txOutboundPeak.Store(depth) - } - var sample []byte - isData := false select { case frame := <-p.outbound: sample = frame - isData = true idleTicks = 0 default: idleTicks++ @@ -311,21 +251,10 @@ func (p *streamTransport) writerLoop() { sample = vp8Keepalive } - err := p.track.WriteSample(media.Sample{ + _ = p.track.WriteSample(media.Sample{ Data: sample, Duration: sampleInterval, }) - if err != nil { - p.txWriteErrors.Add(1) - continue - } - - if isData { - p.txDataSent.Add(1) - p.txDataBytes.Add(uint64(len(sample))) - } else { - p.txKeepaliveTx.Add(1) - } } } } @@ -357,72 +286,11 @@ func (p *streamTransport) readVP8Track(track *webrtc.TrackRemote) { var haveLastSeq bool frameValid := false - // Loss / reorder metrics, reported every 5s. - var ( - rtpPackets uint64 - rtpLostPkts uint64 // packets we never saw (gap size sum) - rtpReorders uint64 // out-of-order arrivals (small backwards jumps) - framesTotal uint64 // frames marked complete (Marker) - framesValid uint64 // delivered to dispatch - framesInvalid uint64 // dropped because of mid-frame loss - droppedQueue uint64 // dropped because inbound chan was full - ) - reportTicker := time.NewTicker(5 * time.Second) - defer reportTicker.Stop() - reportDone := make(chan struct{}) - go func() { - defer close(reportDone) - var lastPkts, lastLost, lastReorder, lastTotal, lastValid, lastInvalid, lastDropped uint64 - for { - select { - case <-p.closeCh: - return - case <-reportTicker.C: - pkts := atomic.LoadUint64(&rtpPackets) - lost := atomic.LoadUint64(&rtpLostPkts) - reord := atomic.LoadUint64(&rtpReorders) - total := atomic.LoadUint64(&framesTotal) - valid := atomic.LoadUint64(&framesValid) - invalid := atomic.LoadUint64(&framesInvalid) - dropped := atomic.LoadUint64(&droppedQueue) - - dPkts := pkts - lastPkts - dLost := lost - lastLost - dReord := reord - lastReorder - dTotal := total - lastTotal - dValid := valid - lastValid - dInvalid := invalid - lastInvalid - dDropped := dropped - lastDropped - - lastPkts, lastLost, lastReorder = pkts, lost, reord - lastTotal, lastValid, lastInvalid, lastDropped = total, valid, invalid, dropped - - if dPkts == 0 && dTotal == 0 { - continue - } - lossPct := 0.0 - if total := dPkts + dLost; total > 0 { - lossPct = float64(dLost) * 100 / float64(total) - } - frameLossPct := 0.0 - if dTotal > 0 { - frameLossPct = float64(dInvalid) * 100 / float64(dTotal) - } - log.Printf("vp8channel rx[5s]: rtp=%d lost=%d (%.2f%%) reorder=%d frames=%d ok=%d bad=%d (%.2f%%) qdrop=%d inboundQ=%d/%d", - dPkts, dLost, lossPct, dReord, - dTotal, dValid, dInvalid, frameLossPct, - dDropped, len(p.inbound), cap(p.inbound)) - } - } - }() - for { n, _, err := track.Read(buf) if err != nil { - <-reportDone return } - atomic.AddUint64(&rtpPackets, 1) pkt := &rtp.Packet{} if pkt.Unmarshal(buf[:n]) != nil { @@ -437,12 +305,6 @@ func (p *streamTransport) readVP8Track(track *webrtc.TrackRemote) { if haveLastSeq { expected := lastSeq + 1 if pkt.SequenceNumber != expected { - diff := int16(pkt.SequenceNumber - expected) - if diff > 0 { - atomic.AddUint64(&rtpLostPkts, uint64(diff)) - } else { - atomic.AddUint64(&rtpReorders, 1) - } frameValid = false frameBuf = frameBuf[:0] } @@ -469,15 +331,12 @@ func (p *streamTransport) readVP8Track(track *webrtc.TrackRemote) { frameBuf = append(frameBuf, vp8Payload...) if pkt.Marker { - atomic.AddUint64(&framesTotal, 1) data := extractDataFromPayload(frameBuf) frameBuf = frameBuf[:0] frameValid = false if data == nil { - atomic.AddUint64(&framesInvalid, 1) continue } - atomic.AddUint64(&framesValid, 1) // Copy out of the shared frame buffer before handing the // payload to the dispatch goroutine. payload := make([]byte, len(data)) @@ -489,7 +348,6 @@ func (p *streamTransport) readVP8Track(track *webrtc.TrackRemote) { select { case p.inbound <- payload: default: - atomic.AddUint64(&droppedQueue, 1) } } }