From a758b6fb2dcc04f2f279b09a36b4ee5e8b0b5a03 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Fri, 10 Apr 2026 14:54:21 +0300 Subject: [PATCH] perf(client,mux,server,peer): Remove verbose logging and optimize buffer thresholds --- internal/client/client.go | 19 ------------------- internal/mux/mux.go | 18 +++++++----------- internal/server/server.go | 16 ++++++++++++---- internal/telemost/peer.go | 22 +++++++--------------- 4 files changed, 26 insertions(+), 49 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index 4928817..4f53819 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -166,7 +166,6 @@ func (c *Client) onData(data []byte) { return } - logger.Verbose("Received %d bytes from server", len(plaintext)) c.mux.HandleFrame(plaintext) } @@ -208,7 +207,6 @@ func (c *Client) runSOCKS5(ctx context.Context, port int) error { func (c *Client) handleSOCKS5(conn net.Conn) { defer conn.Close() - startTime := time.Now() buf := make([]byte, 256) @@ -275,36 +273,20 @@ func (c *Client) handleSOCKS5(conn net.Conn) { } reqData, _ := json.Marshal(req) - sendTime := time.Now() - - queueLen := 0 - buffered := uint64(0) - for _, peer := range c.peers { - if peer != nil { - queueLen += len(peer.GetSendQueue()) - buffered += peer.GetBufferedAmount() - } - } - c.mux.SendData(sid, reqData) - log.Printf("[CLIENT] sid=%d SEND_REQUEST elapsed=%v queue_len=%d dc_buffered=%d", - sid, time.Since(sendTime), queueLen, buffered) dataReady := c.mux.WaitForData(sid) timeout := time.NewTimer(10 * time.Second) defer timeout.Stop() - waitStart := time.Now() select { case <-dataReady: - log.Printf("[CLIENT] sid=%d RESPONSE_RECEIVED wait_time=%v total_elapsed=%v", sid, time.Since(waitStart), time.Since(startTime)) stream := c.mux.GetStream(sid) if stream == nil || len(stream.RecvBuf()) == 0 { conn.Write([]byte{5, 4, 0, 1, 0, 0, 0, 0, 0, 0}) return } case <-timeout.C: - log.Printf("[CLIENT] sid=%d TIMEOUT after wait_time=%v total_elapsed=%v", sid, time.Since(waitStart), time.Since(startTime)) conn.Write([]byte{5, 4, 0, 1, 0, 0, 0, 0, 0, 0}) return } @@ -312,7 +294,6 @@ func (c *Client) handleSOCKS5(conn net.Conn) { c.mux.ReadStream(sid) conn.Write([]byte{5, 0, 0, 1, 0, 0, 0, 0, 0, 0}) - log.Printf("[CLIENT] sid=%d SOCKS5_READY total_elapsed=%v", sid, time.Since(startTime)) done := make(chan struct{}) streamClosed := make(chan struct{}) diff --git a/internal/mux/mux.go b/internal/mux/mux.go index bf5d784..30a7934 100644 --- a/internal/mux/mux.go +++ b/internal/mux/mux.go @@ -48,7 +48,7 @@ func New(clientID uint32, onSend func([]byte) error) *Multiplexer { clientID: clientID, onSend: onSend, maxStreams: 10000, - maxBufferSize: 1024 * 1024, + maxBufferSize: 16 * 1024 * 1024, dataReady: make(map[uint16]chan struct{}), sendSeq: make(map[uint16]uint32), } @@ -83,13 +83,16 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error { m.mu.RUnlock() if !exists || stream.closed { - logger.Debug("SendData: stream %d not exists or closed", sid) return nil } - logger.Verbose("SendData: sid=%d, size=%d bytes", sid, len(data)) + const chunkSize = 7168 + totalChunks := (len(data) + chunkSize - 1) / chunkSize + + if totalChunks > 10 { + logger.Debug("SendData: sid=%d, size=%d bytes, chunks=%d", sid, len(data), totalChunks) + } - const chunkSize = 4096 for i := 0; i < len(data); i += chunkSize { end := i + chunkSize if end > len(data) { @@ -111,7 +114,6 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error { copy(frame[12:], chunk) if err := m.onSend(frame); err != nil { - logger.Debug("[MUX] sid=%d onSend error: %v", sid, err) return err } } @@ -165,8 +167,6 @@ func (m *Multiplexer) HandleFrame(frame []byte) { sid := binary.BigEndian.Uint16(frame[4:6]) length := binary.BigEndian.Uint16(frame[6:8]) seq := binary.BigEndian.Uint32(frame[8:12]) - - logger.Verbose("[MUX] HandleFrame sid=%d len=%d seq=%d", sid, length, seq) if sid == 0xFFFF && length == 0xFFFF { m.mu.Lock() @@ -236,7 +236,6 @@ func (m *Multiplexer) HandleFrame(frame []byte) { stream.recvBuf = append(stream.recvBuf, nextData...) delete(stream.outOfOrder, stream.nextSeq) stream.nextSeq++ - logger.Verbose("Applied out-of-order packet sid=%d seq=%d", sid, stream.nextSeq-1) } else { break } @@ -253,10 +252,7 @@ func (m *Multiplexer) HandleFrame(frame []byte) { } else if seq > stream.nextSeq { if len(stream.outOfOrder) < 100 { stream.outOfOrder[seq] = append([]byte(nil), data...) - logger.Verbose("Buffered out-of-order packet sid=%d seq=%d (expected %d)", sid, seq, stream.nextSeq) } - } else { - logger.Verbose("Dropped duplicate packet sid=%d seq=%d (expected %d)", sid, seq, stream.nextSeq) } } diff --git a/internal/server/server.go b/internal/server/server.go index 3dcfbd6..5486d63 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -182,8 +182,6 @@ func (s *Server) onData(data []byte) { return } - logger.Verbose("Received %d bytes from client", len(plaintext)) - if len(plaintext) >= 12 { clientID := binary.BigEndian.Uint32(plaintext[0:4]) sid := binary.BigEndian.Uint16(plaintext[4:6]) @@ -342,9 +340,7 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) { log.Printf("[SERVER] sid=%d CONNECT_SUCCESS dial_time=%v", sid, dialElapsed) - sendStart := time.Now() s.mux.SendData(sid, []byte{0x00}) - log.Printf("[SERVER] sid=%d RESPONSE_SENT send_time=%v total_elapsed=%v", sid, time.Since(sendStart), time.Since(startTime)) go func() { defer func() { @@ -355,9 +351,15 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) { }() buf := make([]byte, 16384) + totalSent := uint64(0) + lastLog := time.Now() + for { n, err := conn.Read(buf) if err != nil { + if totalSent > 1024*1024 { + log.Printf("[SERVER] sid=%d TRANSFER_COMPLETE total=%d MB", sid, totalSent/(1024*1024)) + } return } @@ -368,6 +370,12 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) { if err := s.mux.SendData(sid, buf[:n]); err != nil { return } + + totalSent += uint64(n) + if time.Since(lastLog) > 5*time.Second { + log.Printf("[SERVER] sid=%d TRANSFER_PROGRESS sent=%d MB", sid, totalSent/(1024*1024)) + lastLog = time.Now() + } } }() } diff --git a/internal/telemost/peer.go b/internal/telemost/peer.go index bdac266..2ce9c3a 100644 --- a/internal/telemost/peer.go +++ b/internal/telemost/peer.go @@ -11,7 +11,6 @@ import ( "github.com/google/uuid" "github.com/gorilla/websocket" - "github.com/openlibrecommunity/olcrtc/internal/logger" "github.com/pion/webrtc/v4" ) @@ -726,30 +725,23 @@ func (p *Peer) processSendQueue(workerID int) { start := time.Now() - for p.dc.BufferedAmount() > 64*1024 { - time.Sleep(10 * time.Millisecond) - if time.Since(start) > 3*time.Second { + for p.dc.BufferedAmount() > 16*1024 { + time.Sleep(5 * time.Millisecond) + if time.Since(start) > 5*time.Second { log.Printf("[WORKER-%d] Buffer wait timeout, dropping packet size=%d", workerID, len(data)) break } } - if time.Since(start) > 3*time.Second { + if time.Since(start) > 5*time.Second { continue } - sendStart := time.Now() if err := p.dc.Send(data); err != nil { log.Printf("[WORKER-%d] Send error: %v", workerID, err) - } else { - elapsed := time.Since(sendStart) - if elapsed > 50*time.Millisecond { - log.Printf("[WORKER-%d] Sent %d bytes in %v (buffered: %d)", - workerID, len(data), elapsed, p.dc.BufferedAmount()) - } else { - logger.Verbose("[WORKER-%d] Sent %d bytes (buffered: %d)", - workerID, len(data), p.dc.BufferedAmount()) - } + } else if time.Since(start) > 100*time.Millisecond { + log.Printf("[WORKER-%d] Sent %d bytes in %v (buffered: %d)", + workerID, len(data), time.Since(start), p.dc.BufferedAmount()) } case <-p.closeCh: