mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-05-30 17:09:43 +00:00
perf(client,mux,server,peer): Remove verbose logging and optimize buffer thresholds
This commit is contained in:
@@ -166,7 +166,6 @@ func (c *Client) onData(data []byte) {
|
||||
return
|
||||
}
|
||||
|
||||
logger.Verbose("Received %d bytes from server", len(plaintext))
|
||||
c.mux.HandleFrame(plaintext)
|
||||
}
|
||||
|
||||
@@ -208,7 +207,6 @@ func (c *Client) runSOCKS5(ctx context.Context, port int) error {
|
||||
|
||||
func (c *Client) handleSOCKS5(conn net.Conn) {
|
||||
defer conn.Close()
|
||||
startTime := time.Now()
|
||||
|
||||
buf := make([]byte, 256)
|
||||
|
||||
@@ -275,36 +273,20 @@ func (c *Client) handleSOCKS5(conn net.Conn) {
|
||||
}
|
||||
|
||||
reqData, _ := json.Marshal(req)
|
||||
sendTime := time.Now()
|
||||
|
||||
queueLen := 0
|
||||
buffered := uint64(0)
|
||||
for _, peer := range c.peers {
|
||||
if peer != nil {
|
||||
queueLen += len(peer.GetSendQueue())
|
||||
buffered += peer.GetBufferedAmount()
|
||||
}
|
||||
}
|
||||
|
||||
c.mux.SendData(sid, reqData)
|
||||
log.Printf("[CLIENT] sid=%d SEND_REQUEST elapsed=%v queue_len=%d dc_buffered=%d",
|
||||
sid, time.Since(sendTime), queueLen, buffered)
|
||||
|
||||
dataReady := c.mux.WaitForData(sid)
|
||||
timeout := time.NewTimer(10 * time.Second)
|
||||
defer timeout.Stop()
|
||||
|
||||
waitStart := time.Now()
|
||||
select {
|
||||
case <-dataReady:
|
||||
log.Printf("[CLIENT] sid=%d RESPONSE_RECEIVED wait_time=%v total_elapsed=%v", sid, time.Since(waitStart), time.Since(startTime))
|
||||
stream := c.mux.GetStream(sid)
|
||||
if stream == nil || len(stream.RecvBuf()) == 0 {
|
||||
conn.Write([]byte{5, 4, 0, 1, 0, 0, 0, 0, 0, 0})
|
||||
return
|
||||
}
|
||||
case <-timeout.C:
|
||||
log.Printf("[CLIENT] sid=%d TIMEOUT after wait_time=%v total_elapsed=%v", sid, time.Since(waitStart), time.Since(startTime))
|
||||
conn.Write([]byte{5, 4, 0, 1, 0, 0, 0, 0, 0, 0})
|
||||
return
|
||||
}
|
||||
@@ -312,7 +294,6 @@ func (c *Client) handleSOCKS5(conn net.Conn) {
|
||||
c.mux.ReadStream(sid)
|
||||
|
||||
conn.Write([]byte{5, 0, 0, 1, 0, 0, 0, 0, 0, 0})
|
||||
log.Printf("[CLIENT] sid=%d SOCKS5_READY total_elapsed=%v", sid, time.Since(startTime))
|
||||
|
||||
done := make(chan struct{})
|
||||
streamClosed := make(chan struct{})
|
||||
|
||||
@@ -48,7 +48,7 @@ func New(clientID uint32, onSend func([]byte) error) *Multiplexer {
|
||||
clientID: clientID,
|
||||
onSend: onSend,
|
||||
maxStreams: 10000,
|
||||
maxBufferSize: 1024 * 1024,
|
||||
maxBufferSize: 16 * 1024 * 1024,
|
||||
dataReady: make(map[uint16]chan struct{}),
|
||||
sendSeq: make(map[uint16]uint32),
|
||||
}
|
||||
@@ -83,13 +83,16 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error {
|
||||
m.mu.RUnlock()
|
||||
|
||||
if !exists || stream.closed {
|
||||
logger.Debug("SendData: stream %d not exists or closed", sid)
|
||||
return nil
|
||||
}
|
||||
|
||||
logger.Verbose("SendData: sid=%d, size=%d bytes", sid, len(data))
|
||||
const chunkSize = 7168
|
||||
totalChunks := (len(data) + chunkSize - 1) / chunkSize
|
||||
|
||||
if totalChunks > 10 {
|
||||
logger.Debug("SendData: sid=%d, size=%d bytes, chunks=%d", sid, len(data), totalChunks)
|
||||
}
|
||||
|
||||
const chunkSize = 4096
|
||||
for i := 0; i < len(data); i += chunkSize {
|
||||
end := i + chunkSize
|
||||
if end > len(data) {
|
||||
@@ -111,7 +114,6 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error {
|
||||
copy(frame[12:], chunk)
|
||||
|
||||
if err := m.onSend(frame); err != nil {
|
||||
logger.Debug("[MUX] sid=%d onSend error: %v", sid, err)
|
||||
return err
|
||||
}
|
||||
}
|
||||
@@ -165,8 +167,6 @@ func (m *Multiplexer) HandleFrame(frame []byte) {
|
||||
sid := binary.BigEndian.Uint16(frame[4:6])
|
||||
length := binary.BigEndian.Uint16(frame[6:8])
|
||||
seq := binary.BigEndian.Uint32(frame[8:12])
|
||||
|
||||
logger.Verbose("[MUX] HandleFrame sid=%d len=%d seq=%d", sid, length, seq)
|
||||
|
||||
if sid == 0xFFFF && length == 0xFFFF {
|
||||
m.mu.Lock()
|
||||
@@ -236,7 +236,6 @@ func (m *Multiplexer) HandleFrame(frame []byte) {
|
||||
stream.recvBuf = append(stream.recvBuf, nextData...)
|
||||
delete(stream.outOfOrder, stream.nextSeq)
|
||||
stream.nextSeq++
|
||||
logger.Verbose("Applied out-of-order packet sid=%d seq=%d", sid, stream.nextSeq-1)
|
||||
} else {
|
||||
break
|
||||
}
|
||||
@@ -253,10 +252,7 @@ func (m *Multiplexer) HandleFrame(frame []byte) {
|
||||
} else if seq > stream.nextSeq {
|
||||
if len(stream.outOfOrder) < 100 {
|
||||
stream.outOfOrder[seq] = append([]byte(nil), data...)
|
||||
logger.Verbose("Buffered out-of-order packet sid=%d seq=%d (expected %d)", sid, seq, stream.nextSeq)
|
||||
}
|
||||
} else {
|
||||
logger.Verbose("Dropped duplicate packet sid=%d seq=%d (expected %d)", sid, seq, stream.nextSeq)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -182,8 +182,6 @@ func (s *Server) onData(data []byte) {
|
||||
return
|
||||
}
|
||||
|
||||
logger.Verbose("Received %d bytes from client", len(plaintext))
|
||||
|
||||
if len(plaintext) >= 12 {
|
||||
clientID := binary.BigEndian.Uint32(plaintext[0:4])
|
||||
sid := binary.BigEndian.Uint16(plaintext[4:6])
|
||||
@@ -342,9 +340,7 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) {
|
||||
|
||||
log.Printf("[SERVER] sid=%d CONNECT_SUCCESS dial_time=%v", sid, dialElapsed)
|
||||
|
||||
sendStart := time.Now()
|
||||
s.mux.SendData(sid, []byte{0x00})
|
||||
log.Printf("[SERVER] sid=%d RESPONSE_SENT send_time=%v total_elapsed=%v", sid, time.Since(sendStart), time.Since(startTime))
|
||||
|
||||
go func() {
|
||||
defer func() {
|
||||
@@ -355,9 +351,15 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) {
|
||||
}()
|
||||
|
||||
buf := make([]byte, 16384)
|
||||
totalSent := uint64(0)
|
||||
lastLog := time.Now()
|
||||
|
||||
for {
|
||||
n, err := conn.Read(buf)
|
||||
if err != nil {
|
||||
if totalSent > 1024*1024 {
|
||||
log.Printf("[SERVER] sid=%d TRANSFER_COMPLETE total=%d MB", sid, totalSent/(1024*1024))
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
@@ -368,6 +370,12 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) {
|
||||
if err := s.mux.SendData(sid, buf[:n]); err != nil {
|
||||
return
|
||||
}
|
||||
|
||||
totalSent += uint64(n)
|
||||
if time.Since(lastLog) > 5*time.Second {
|
||||
log.Printf("[SERVER] sid=%d TRANSFER_PROGRESS sent=%d MB", sid, totalSent/(1024*1024))
|
||||
lastLog = time.Now()
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -11,7 +11,6 @@ import (
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/openlibrecommunity/olcrtc/internal/logger"
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
@@ -726,30 +725,23 @@ func (p *Peer) processSendQueue(workerID int) {
|
||||
|
||||
start := time.Now()
|
||||
|
||||
for p.dc.BufferedAmount() > 64*1024 {
|
||||
time.Sleep(10 * time.Millisecond)
|
||||
if time.Since(start) > 3*time.Second {
|
||||
for p.dc.BufferedAmount() > 16*1024 {
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
if time.Since(start) > 5*time.Second {
|
||||
log.Printf("[WORKER-%d] Buffer wait timeout, dropping packet size=%d", workerID, len(data))
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if time.Since(start) > 3*time.Second {
|
||||
if time.Since(start) > 5*time.Second {
|
||||
continue
|
||||
}
|
||||
|
||||
sendStart := time.Now()
|
||||
if err := p.dc.Send(data); err != nil {
|
||||
log.Printf("[WORKER-%d] Send error: %v", workerID, err)
|
||||
} else {
|
||||
elapsed := time.Since(sendStart)
|
||||
if elapsed > 50*time.Millisecond {
|
||||
log.Printf("[WORKER-%d] Sent %d bytes in %v (buffered: %d)",
|
||||
workerID, len(data), elapsed, p.dc.BufferedAmount())
|
||||
} else {
|
||||
logger.Verbose("[WORKER-%d] Sent %d bytes (buffered: %d)",
|
||||
workerID, len(data), p.dc.BufferedAmount())
|
||||
}
|
||||
} else if time.Since(start) > 100*time.Millisecond {
|
||||
log.Printf("[WORKER-%d] Sent %d bytes in %v (buffered: %d)",
|
||||
workerID, len(data), time.Since(start), p.dc.BufferedAmount())
|
||||
}
|
||||
|
||||
case <-p.closeCh:
|
||||
|
||||
Reference in New Issue
Block a user