mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-06-02 18:39:46 +00:00
refactor: remove telemetry and metrics collection
This commit is contained in:
@@ -519,27 +519,18 @@ func (s *Server) dial(req ConnectRequest) (net.Conn, error) {
|
||||
}
|
||||
|
||||
func (s *Server) pumpToMux(sid uint16, conn net.Conn) {
|
||||
start := time.Now()
|
||||
var totalBytes uint64
|
||||
var totalReads uint64
|
||||
var lastReadAt time.Time = start
|
||||
var exitReason string
|
||||
|
||||
defer func() {
|
||||
s.activeClients.Add(-1)
|
||||
_ = s.mux.CloseStream(sid)
|
||||
s.connMu.Lock()
|
||||
delete(s.connections, sid)
|
||||
s.connMu.Unlock()
|
||||
logger.Infof("sid=%d pumpToMux exit reason=%s bytes=%d reads=%d uptime=%v sinceLastRead=%v",
|
||||
sid, exitReason, totalBytes, totalReads,
|
||||
time.Since(start), time.Since(lastReadAt))
|
||||
}()
|
||||
|
||||
// Decoupling queue: Read goroutine pushes here, sender goroutine drains
|
||||
// to mux.SendData. Without this, slow channel back-pressure stalls the
|
||||
// upstream Read which can cause TCP receive window to collapse to zero
|
||||
// and effectively wedge the connection (selectel stops sending and never
|
||||
// and effectively wedge the connection (peer stops sending and never
|
||||
// resumes even though our channel is healthy).
|
||||
type chunk struct{ data []byte }
|
||||
queue := make(chan chunk, 64)
|
||||
@@ -557,10 +548,10 @@ func (s *Server) pumpToMux(sid uint16, conn net.Conn) {
|
||||
}
|
||||
}()
|
||||
|
||||
// queueHasSpace blocks until the decoupling queue has room or we're
|
||||
// shutting down. We deliberately wait *before* arming the upstream
|
||||
// read deadline so that channel back-pressure does not get billed to
|
||||
// the upstream socket as idle time and trip a spurious i/o timeout.
|
||||
// queueHasSpace blocks until the decoupling queue has room or the
|
||||
// sender goroutine has exited. We wait here *before* arming the
|
||||
// upstream read deadline so that channel back-pressure isn't billed
|
||||
// to the socket as idle time and doesn't trip a spurious i/o timeout.
|
||||
queueHasSpace := func() bool {
|
||||
for {
|
||||
if len(queue) < cap(queue) {
|
||||
@@ -574,28 +565,6 @@ func (s *Server) pumpToMux(sid uint16, conn net.Conn) {
|
||||
}
|
||||
}
|
||||
|
||||
// Periodic read stats so we can see throughput per-sid in real time.
|
||||
statsTicker := time.NewTicker(5 * time.Second)
|
||||
defer statsTicker.Stop()
|
||||
statsStop := make(chan struct{})
|
||||
var statsLastBytes uint64
|
||||
go func() {
|
||||
for {
|
||||
select {
|
||||
case <-statsStop:
|
||||
return
|
||||
case <-statsTicker.C:
|
||||
cur := atomic.LoadUint64(&totalBytes)
|
||||
delta := cur - statsLastBytes
|
||||
statsLastBytes = cur
|
||||
idle := time.Since(lastReadAt)
|
||||
logger.Infof("sid=%d upstream rx[5s]: bytes=%d total=%d reads=%d idle=%v queueLen=%d",
|
||||
sid, delta, cur, atomic.LoadUint64(&totalReads), idle, len(queue))
|
||||
}
|
||||
}
|
||||
}()
|
||||
defer close(statsStop)
|
||||
|
||||
buf := make([]byte, 16384)
|
||||
|
||||
// Idle timeout for genuinely dead upstreams. Only armed when we are
|
||||
@@ -605,34 +574,23 @@ func (s *Server) pumpToMux(sid uint16, conn net.Conn) {
|
||||
const idleReadTimeout = 60 * time.Second
|
||||
|
||||
for {
|
||||
// Wait until the decoupling queue has space *before* reading from
|
||||
// upstream. While we are blocked here we leave the read deadline
|
||||
// disarmed so a slow vp8 channel cannot kill a healthy TCP socket.
|
||||
if !queueHasSpace() {
|
||||
exitReason = "sender goroutine exited"
|
||||
close(queue)
|
||||
<-doneSender
|
||||
return
|
||||
}
|
||||
|
||||
// Arm the deadline only now, when we actually want bytes from the
|
||||
// peer. If the peer is alive, Read returns quickly and we re-arm
|
||||
// on the next loop. If the peer truly went silent, we surface
|
||||
// i/o timeout after idleReadTimeout instead of hanging forever.
|
||||
// Arm the deadline only when we actually want bytes from the peer.
|
||||
_ = conn.SetReadDeadline(time.Now().Add(idleReadTimeout))
|
||||
|
||||
n, err := conn.Read(buf)
|
||||
if err != nil {
|
||||
exitReason = fmt.Sprintf("read error: %v", err)
|
||||
close(queue)
|
||||
<-doneSender
|
||||
return
|
||||
}
|
||||
atomic.AddUint64(&totalBytes, uint64(n))
|
||||
atomic.AddUint64(&totalReads, 1)
|
||||
lastReadAt = time.Now()
|
||||
|
||||
// Clear the deadline so it does not fire while we are blocked in
|
||||
// Clear the deadline so it doesn't fire while we are blocked in
|
||||
// queueHasSpace() on the next iteration (back-pressure path).
|
||||
_ = conn.SetReadDeadline(time.Time{})
|
||||
|
||||
@@ -641,8 +599,7 @@ func (s *Server) pumpToMux(sid uint16, conn net.Conn) {
|
||||
copy(c, buf[:n])
|
||||
|
||||
// Guaranteed non-blocking thanks to queueHasSpace() above (we are
|
||||
// the sole producer). Falling back to a blocking send is still
|
||||
// safe but should never be needed.
|
||||
// the sole producer); the blocking fallback is just defensive.
|
||||
select {
|
||||
case queue <- chunk{data: c}:
|
||||
default:
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"log"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
@@ -59,15 +58,6 @@ type streamTransport struct {
|
||||
startOnce sync.Once
|
||||
frameInterval time.Duration
|
||||
batchSize int
|
||||
|
||||
// TX metrics
|
||||
txSendCalls atomic.Uint64 // Send() invocations
|
||||
txSendBlocked atomic.Uint64 // Send() that had to block on full outbound
|
||||
txDataSent atomic.Uint64 // data samples written to track
|
||||
txDataBytes atomic.Uint64 // total bytes of data samples
|
||||
txKeepaliveTx atomic.Uint64 // keepalive samples written
|
||||
txWriteErrors atomic.Uint64 // WriteSample errors
|
||||
txOutboundPeak atomic.Uint64 // max observed outbound depth in window
|
||||
}
|
||||
|
||||
func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) {
|
||||
@@ -153,19 +143,7 @@ func (p *streamTransport) Send(data []byte) error {
|
||||
}
|
||||
|
||||
frame := encodeDataFrame(data)
|
||||
p.txSendCalls.Add(1)
|
||||
|
||||
// Fast path
|
||||
select {
|
||||
case <-p.closeCh:
|
||||
return ErrTransportClosed
|
||||
case p.outbound <- frame:
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
// Slow path: outbound full, account for the stall.
|
||||
p.txSendBlocked.Add(1)
|
||||
select {
|
||||
case <-p.closeCh:
|
||||
return ErrTransportClosed
|
||||
@@ -254,53 +232,15 @@ func (p *streamTransport) writerLoop() {
|
||||
}
|
||||
idleTicks := 0
|
||||
|
||||
reportTicker := time.NewTicker(5 * time.Second)
|
||||
defer reportTicker.Stop()
|
||||
var lastCalls, lastBlocked, lastData, lastBytes, lastKa, lastErrs uint64
|
||||
reportFn := func() {
|
||||
calls := p.txSendCalls.Load()
|
||||
blocked := p.txSendBlocked.Load()
|
||||
data := p.txDataSent.Load()
|
||||
bytes := p.txDataBytes.Load()
|
||||
ka := p.txKeepaliveTx.Load()
|
||||
errs := p.txWriteErrors.Load()
|
||||
peak := p.txOutboundPeak.Swap(0)
|
||||
|
||||
dCalls := calls - lastCalls
|
||||
dBlocked := blocked - lastBlocked
|
||||
dData := data - lastData
|
||||
dBytes := bytes - lastBytes
|
||||
dKa := ka - lastKa
|
||||
dErrs := errs - lastErrs
|
||||
|
||||
lastCalls, lastBlocked, lastData, lastBytes, lastKa, lastErrs =
|
||||
calls, blocked, data, bytes, ka, errs
|
||||
|
||||
log.Printf("vp8channel tx[5s]: send=%d blocked=%d data=%d bytes=%d ka=%d werr=%d outQ=%d/%d peak=%d kbps=%.1f",
|
||||
dCalls, dBlocked, dData, dBytes, dKa, dErrs,
|
||||
len(p.outbound), cap(p.outbound), peak,
|
||||
float64(dBytes)*8/1024/5)
|
||||
}
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-p.closeCh:
|
||||
reportFn()
|
||||
return
|
||||
case <-reportTicker.C:
|
||||
reportFn()
|
||||
case <-ticker.C:
|
||||
// Track outbound queue peak for diagnostics.
|
||||
if depth := uint64(len(p.outbound)); depth > p.txOutboundPeak.Load() {
|
||||
p.txOutboundPeak.Store(depth)
|
||||
}
|
||||
|
||||
var sample []byte
|
||||
isData := false
|
||||
select {
|
||||
case frame := <-p.outbound:
|
||||
sample = frame
|
||||
isData = true
|
||||
idleTicks = 0
|
||||
default:
|
||||
idleTicks++
|
||||
@@ -311,21 +251,10 @@ func (p *streamTransport) writerLoop() {
|
||||
sample = vp8Keepalive
|
||||
}
|
||||
|
||||
err := p.track.WriteSample(media.Sample{
|
||||
_ = p.track.WriteSample(media.Sample{
|
||||
Data: sample,
|
||||
Duration: sampleInterval,
|
||||
})
|
||||
if err != nil {
|
||||
p.txWriteErrors.Add(1)
|
||||
continue
|
||||
}
|
||||
|
||||
if isData {
|
||||
p.txDataSent.Add(1)
|
||||
p.txDataBytes.Add(uint64(len(sample)))
|
||||
} else {
|
||||
p.txKeepaliveTx.Add(1)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -357,72 +286,11 @@ func (p *streamTransport) readVP8Track(track *webrtc.TrackRemote) {
|
||||
var haveLastSeq bool
|
||||
frameValid := false
|
||||
|
||||
// Loss / reorder metrics, reported every 5s.
|
||||
var (
|
||||
rtpPackets uint64
|
||||
rtpLostPkts uint64 // packets we never saw (gap size sum)
|
||||
rtpReorders uint64 // out-of-order arrivals (small backwards jumps)
|
||||
framesTotal uint64 // frames marked complete (Marker)
|
||||
framesValid uint64 // delivered to dispatch
|
||||
framesInvalid uint64 // dropped because of mid-frame loss
|
||||
droppedQueue uint64 // dropped because inbound chan was full
|
||||
)
|
||||
reportTicker := time.NewTicker(5 * time.Second)
|
||||
defer reportTicker.Stop()
|
||||
reportDone := make(chan struct{})
|
||||
go func() {
|
||||
defer close(reportDone)
|
||||
var lastPkts, lastLost, lastReorder, lastTotal, lastValid, lastInvalid, lastDropped uint64
|
||||
for {
|
||||
select {
|
||||
case <-p.closeCh:
|
||||
return
|
||||
case <-reportTicker.C:
|
||||
pkts := atomic.LoadUint64(&rtpPackets)
|
||||
lost := atomic.LoadUint64(&rtpLostPkts)
|
||||
reord := atomic.LoadUint64(&rtpReorders)
|
||||
total := atomic.LoadUint64(&framesTotal)
|
||||
valid := atomic.LoadUint64(&framesValid)
|
||||
invalid := atomic.LoadUint64(&framesInvalid)
|
||||
dropped := atomic.LoadUint64(&droppedQueue)
|
||||
|
||||
dPkts := pkts - lastPkts
|
||||
dLost := lost - lastLost
|
||||
dReord := reord - lastReorder
|
||||
dTotal := total - lastTotal
|
||||
dValid := valid - lastValid
|
||||
dInvalid := invalid - lastInvalid
|
||||
dDropped := dropped - lastDropped
|
||||
|
||||
lastPkts, lastLost, lastReorder = pkts, lost, reord
|
||||
lastTotal, lastValid, lastInvalid, lastDropped = total, valid, invalid, dropped
|
||||
|
||||
if dPkts == 0 && dTotal == 0 {
|
||||
continue
|
||||
}
|
||||
lossPct := 0.0
|
||||
if total := dPkts + dLost; total > 0 {
|
||||
lossPct = float64(dLost) * 100 / float64(total)
|
||||
}
|
||||
frameLossPct := 0.0
|
||||
if dTotal > 0 {
|
||||
frameLossPct = float64(dInvalid) * 100 / float64(dTotal)
|
||||
}
|
||||
log.Printf("vp8channel rx[5s]: rtp=%d lost=%d (%.2f%%) reorder=%d frames=%d ok=%d bad=%d (%.2f%%) qdrop=%d inboundQ=%d/%d",
|
||||
dPkts, dLost, lossPct, dReord,
|
||||
dTotal, dValid, dInvalid, frameLossPct,
|
||||
dDropped, len(p.inbound), cap(p.inbound))
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
for {
|
||||
n, _, err := track.Read(buf)
|
||||
if err != nil {
|
||||
<-reportDone
|
||||
return
|
||||
}
|
||||
atomic.AddUint64(&rtpPackets, 1)
|
||||
|
||||
pkt := &rtp.Packet{}
|
||||
if pkt.Unmarshal(buf[:n]) != nil {
|
||||
@@ -437,12 +305,6 @@ func (p *streamTransport) readVP8Track(track *webrtc.TrackRemote) {
|
||||
if haveLastSeq {
|
||||
expected := lastSeq + 1
|
||||
if pkt.SequenceNumber != expected {
|
||||
diff := int16(pkt.SequenceNumber - expected)
|
||||
if diff > 0 {
|
||||
atomic.AddUint64(&rtpLostPkts, uint64(diff))
|
||||
} else {
|
||||
atomic.AddUint64(&rtpReorders, 1)
|
||||
}
|
||||
frameValid = false
|
||||
frameBuf = frameBuf[:0]
|
||||
}
|
||||
@@ -469,15 +331,12 @@ func (p *streamTransport) readVP8Track(track *webrtc.TrackRemote) {
|
||||
frameBuf = append(frameBuf, vp8Payload...)
|
||||
|
||||
if pkt.Marker {
|
||||
atomic.AddUint64(&framesTotal, 1)
|
||||
data := extractDataFromPayload(frameBuf)
|
||||
frameBuf = frameBuf[:0]
|
||||
frameValid = false
|
||||
if data == nil {
|
||||
atomic.AddUint64(&framesInvalid, 1)
|
||||
continue
|
||||
}
|
||||
atomic.AddUint64(&framesValid, 1)
|
||||
// Copy out of the shared frame buffer before handing the
|
||||
// payload to the dispatch goroutine.
|
||||
payload := make([]byte, len(data))
|
||||
@@ -489,7 +348,6 @@ func (p *streamTransport) readVP8Track(track *webrtc.TrackRemote) {
|
||||
select {
|
||||
case p.inbound <- payload:
|
||||
default:
|
||||
atomic.AddUint64(&droppedQueue, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user