mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-06-09 05:44:43 +00:00
refactor: simplify and clean up logging messages
This commit is contained in:
@@ -104,9 +104,7 @@ func RunWithReady(
|
||||
|
||||
err = c.runSOCKS5(runCtx, socksHost, socksPort, socksUser, socksPass, onReady)
|
||||
|
||||
log.Println("Waiting for client goroutines...")
|
||||
c.wg.Wait()
|
||||
log.Println("Client goroutines finished")
|
||||
|
||||
return err
|
||||
}
|
||||
@@ -222,15 +220,13 @@ func (c *Client) addPeer(
|
||||
|
||||
func (c *Client) onReconnect(peerID int, dc *webrtc.DataChannel) {
|
||||
if dc == nil {
|
||||
log.Printf("Client peer %d channel closed - resetting multiplexer state", peerID)
|
||||
log.Printf("peer %d channel closed", peerID)
|
||||
} else {
|
||||
log.Printf("Client peer %d reconnected - resetting multiplexer state", peerID)
|
||||
log.Printf("peer %d reconnected", peerID)
|
||||
}
|
||||
|
||||
c.mux.UpdateSendFunc(c.sendFrame)
|
||||
c.mux.Reset()
|
||||
|
||||
log.Println("Client multiplexer reset complete")
|
||||
}
|
||||
|
||||
func (c *Client) sendResetSignal() {
|
||||
@@ -286,7 +282,6 @@ func (c *Client) runSOCKS5(
|
||||
|
||||
go func() {
|
||||
<-ctx.Done()
|
||||
log.Println("Closing SOCKS5 listener...")
|
||||
if err := listener.Close(); err != nil {
|
||||
logger.Debugf("SOCKS5 listener close error: %v", err)
|
||||
}
|
||||
@@ -297,11 +292,10 @@ func (c *Client) runSOCKS5(
|
||||
if err != nil {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Println("SOCKS5 listener closed")
|
||||
c.closePeers()
|
||||
return nil
|
||||
default:
|
||||
log.Printf("Accept error: %v", err)
|
||||
log.Printf("accept error: %v", err)
|
||||
continue
|
||||
}
|
||||
}
|
||||
@@ -359,7 +353,7 @@ func (c *Client) handleSOCKS5(conn net.Conn, username, password string) {
|
||||
|
||||
sid := c.mux.OpenStream()
|
||||
logger.Verbosef("SOCKS5 opened stream sid=%d for %s:%d", sid, addr, port)
|
||||
log.Printf("[CLIENT] sid=%d SOCKS5_START %s:%d", sid, addr, port)
|
||||
log.Printf("sid=%d socks5 %s:%d", sid, addr, port)
|
||||
|
||||
if !c.sendConnectRequest(sid, addr, port) {
|
||||
return
|
||||
|
||||
@@ -107,9 +107,7 @@ func Run(
|
||||
|
||||
err = s.runLoop(runCtx)
|
||||
|
||||
log.Println("Waiting for server goroutines...")
|
||||
s.wg.Wait()
|
||||
log.Println("Server goroutines finished")
|
||||
|
||||
return err
|
||||
}
|
||||
@@ -220,9 +218,9 @@ func (s *Server) addPeer(ctx context.Context, roomURL string, peerID int, cancel
|
||||
|
||||
func (s *Server) handlePeerReconnect(peerID int, dc *webrtc.DataChannel) {
|
||||
if dc == nil {
|
||||
log.Printf("Server peer %d channel closed - resetting mux state", peerID)
|
||||
log.Printf("peer %d channel closed", peerID)
|
||||
} else {
|
||||
log.Printf("Server peer %d reconnected - resetting mux state", peerID)
|
||||
log.Printf("peer %d reconnected", peerID)
|
||||
}
|
||||
|
||||
s.connMu.Lock()
|
||||
@@ -249,7 +247,6 @@ func (s *Server) handlePeerReconnect(peerID int, dc *webrtc.DataChannel) {
|
||||
}
|
||||
|
||||
s.mux.Reset()
|
||||
log.Println("Server multiplexer reset complete")
|
||||
}
|
||||
|
||||
func (s *Server) socks5Connect(conn net.Conn, targetAddr string, targetPort int) error {
|
||||
@@ -337,7 +334,6 @@ func (s *Server) runLoop(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (s *Server) shutdown() {
|
||||
log.Println("Server shutting down...")
|
||||
s.connMu.Lock()
|
||||
for _, conn := range s.connections {
|
||||
if conn != nil {
|
||||
@@ -347,10 +343,9 @@ func (s *Server) shutdown() {
|
||||
s.connMu.Unlock()
|
||||
|
||||
for i, peer := range s.peers {
|
||||
log.Printf("Closing peer %d...", i)
|
||||
log.Printf("closing peer %d", i)
|
||||
_ = peer.Close()
|
||||
}
|
||||
log.Println("All peers closed")
|
||||
}
|
||||
|
||||
func (s *Server) processMuxStreams(ctx context.Context) {
|
||||
@@ -372,7 +367,7 @@ func (s *Server) processMuxStreams(ctx context.Context) {
|
||||
|
||||
var req ConnectRequest
|
||||
if err := json.Unmarshal(data, &req); err == nil && req.Cmd == "connect" {
|
||||
log.Printf("[SERVER] sid=%d RECV_CONNECT %s:%d", sid, req.Addr, req.Port)
|
||||
log.Printf("sid=%d connect %s:%d", sid, req.Addr, req.Port)
|
||||
s.closeStreamConnection(sid)
|
||||
go s.handleConnect(ctx, sid, req)
|
||||
}
|
||||
@@ -426,9 +421,7 @@ func (s *Server) unmarkStreamPump(sid uint16, conn net.Conn) {
|
||||
}
|
||||
|
||||
func (s *Server) handleConnect(ctx context.Context, sid uint16, req ConnectRequest) {
|
||||
startTime := time.Now()
|
||||
addr := net.JoinHostPort(req.Addr, strconv.Itoa(req.Port))
|
||||
log.Printf("[SERVER] sid=%d CONNECT_START %s", sid, addr)
|
||||
|
||||
s.closeStreamConnection(sid)
|
||||
|
||||
@@ -437,8 +430,7 @@ func (s *Server) handleConnect(ctx context.Context, sid uint16, req ConnectReque
|
||||
dialElapsed := time.Since(dialStart)
|
||||
|
||||
if err != nil {
|
||||
log.Printf("[SERVER] sid=%d CONNECT_FAILED dial=%v total=%v err=%v",
|
||||
sid, dialElapsed, time.Since(startTime), err)
|
||||
log.Printf("sid=%d dial %s failed (%v): %v", sid, addr, dialElapsed, err)
|
||||
_ = s.mux.CloseStream(sid)
|
||||
return
|
||||
}
|
||||
@@ -447,7 +439,7 @@ func (s *Server) handleConnect(ctx context.Context, sid uint16, req ConnectReque
|
||||
s.connections[sid] = conn
|
||||
s.connMu.Unlock()
|
||||
|
||||
log.Printf("[SERVER] sid=%d CONNECT_SUCCESS dial=%v", sid, dialElapsed)
|
||||
log.Printf("sid=%d connected %s in %v", sid, addr, dialElapsed)
|
||||
|
||||
s.activeClients.Add(1)
|
||||
_ = s.mux.SendData(sid, []byte{0x00})
|
||||
@@ -505,7 +497,7 @@ func (s *Server) pumpToMux(sid uint16, conn net.Conn) {
|
||||
n, err := conn.Read(buf)
|
||||
if err != nil {
|
||||
if totalSent > 1024*1024 {
|
||||
log.Printf("[SERVER] sid=%d TRANSFER_DONE total=%d MB", sid, totalSent/(1024*1024))
|
||||
log.Printf("sid=%d done total=%dMB", sid, totalSent/(1024*1024))
|
||||
}
|
||||
return
|
||||
}
|
||||
@@ -520,7 +512,7 @@ func (s *Server) pumpToMux(sid uint16, conn net.Conn) {
|
||||
|
||||
totalSent += uint64(n) //nolint:gosec
|
||||
if time.Since(lastLog) > 5*time.Second {
|
||||
log.Printf("[SERVER] sid=%d TRANSFER_UP sent=%d MB", sid, totalSent/(1024*1024))
|
||||
log.Printf("sid=%d sent=%dMB", sid, totalSent/(1024*1024))
|
||||
lastLog = time.Now()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -148,7 +148,6 @@ func (p *Peer) queueReconnect() {
|
||||
return
|
||||
}
|
||||
if p.shouldReconnect != nil && !p.shouldReconnect() {
|
||||
log.Println("Reconnect skipped: shouldReconnect returned false")
|
||||
return
|
||||
}
|
||||
select {
|
||||
@@ -248,7 +247,6 @@ func (p *Peer) setupPeerConnections(config webrtc.Configuration) error {
|
||||
}
|
||||
|
||||
func (p *Peer) onConnectionStateChange(state webrtc.PeerConnectionState) {
|
||||
log.Printf("PeerConnection state: %s", state.String())
|
||||
if !p.closed.Load() && (state == webrtc.PeerConnectionStateFailed ||
|
||||
state == webrtc.PeerConnectionStateDisconnected) {
|
||||
p.queueReconnect()
|
||||
@@ -257,7 +255,6 @@ func (p *Peer) onConnectionStateChange(state webrtc.PeerConnectionState) {
|
||||
|
||||
func (p *Peer) setupDataChannelHandlers(dcReady chan struct{}, sessionCloseCh chan struct{}) {
|
||||
p.dc.OnOpen(func() {
|
||||
log.Println("DataChannel opened")
|
||||
numWorkers := 4
|
||||
for i := range numWorkers {
|
||||
p.wg.Add(1)
|
||||
@@ -278,7 +275,6 @@ func (p *Peer) setupDataChannelHandlers(dcReady chan struct{}, sessionCloseCh ch
|
||||
p.dc.OnMessage(p.onDataChannelMessage)
|
||||
|
||||
p.pcSub.OnDataChannel(func(dc *webrtc.DataChannel) {
|
||||
log.Printf("Received datachannel: %s", dc.Label())
|
||||
dc.OnClose(func() {
|
||||
if !p.closed.Load() {
|
||||
p.queueReconnect()
|
||||
@@ -289,7 +285,6 @@ func (p *Peer) setupDataChannelHandlers(dcReady chan struct{}, sessionCloseCh ch
|
||||
}
|
||||
|
||||
func (p *Peer) onDataChannelClose() {
|
||||
log.Println("DataChannel closed")
|
||||
if p.onReconnect != nil {
|
||||
p.onReconnect(nil)
|
||||
}
|
||||
@@ -355,8 +350,6 @@ func (p *Peer) Send(data []byte) error { //nolint:revive
|
||||
case p.sendQueue <- data:
|
||||
return nil
|
||||
case <-time.After(50 * time.Millisecond):
|
||||
queueLen := len(p.sendQueue)
|
||||
log.Printf("[SEND_QUEUE] Timeout! len=%d size=%d", queueLen, len(data))
|
||||
return ErrSendQueueTimeout
|
||||
}
|
||||
}
|
||||
@@ -415,7 +408,7 @@ func (p *Peer) handleSignaling(ctx context.Context) { //nolint:cyclop
|
||||
for {
|
||||
var msg map[string]interface{}
|
||||
if err := p.ws.ReadJSON(&msg); err != nil {
|
||||
log.Printf("WS read error: %v", err)
|
||||
logger.Debugf("ws read error: %v", err)
|
||||
if !p.closed.Load() {
|
||||
p.queueReconnect()
|
||||
}
|
||||
@@ -443,7 +436,7 @@ func (p *Peer) handleSignaling(ctx context.Context) { //nolint:cyclop
|
||||
|
||||
if offer, ok := msg["subscriberSdpOffer"].(map[string]interface{}); ok && !pubSent {
|
||||
if err := p.handleSdpOffer(offer, uid); err != nil {
|
||||
log.Printf("SDP offer error: %v", err)
|
||||
logger.Debugf("sdp offer error: %v", err)
|
||||
continue
|
||||
}
|
||||
pubSent = true
|
||||
@@ -542,7 +535,7 @@ func (p *Peer) handleSdpAnswer(answer map[string]interface{}, uid string) {
|
||||
Type: webrtc.SDPTypeAnswer,
|
||||
SDP: sdp,
|
||||
}); err != nil {
|
||||
log.Printf("SetRemoteDescription error: %v", err)
|
||||
logger.Debugf("SetRemoteDescription error: %v", err)
|
||||
}
|
||||
p.sendAck(uid)
|
||||
}
|
||||
@@ -744,7 +737,6 @@ func (p *Peer) sendTelemetry(ctx context.Context, endpoint, event string) {
|
||||
}
|
||||
|
||||
func (p *Peer) signalEnded(reason string) {
|
||||
log.Printf("Conference ended: %s", reason)
|
||||
p.closed.Store(true)
|
||||
p.stopTelemetry()
|
||||
if p.onEnded != nil {
|
||||
@@ -837,16 +829,13 @@ func (p *Peer) sendLeave(uid string) bool {
|
||||
}
|
||||
|
||||
if err := p.ws.WriteJSON(leave); err != nil {
|
||||
log.Printf("Failed to send leave: %v", err)
|
||||
return false
|
||||
}
|
||||
log.Println("Sent leave message")
|
||||
return true
|
||||
}
|
||||
|
||||
// Close closes the peer connection and cleans up resources.
|
||||
func (p *Peer) Close() error {
|
||||
log.Println("Closing peer...")
|
||||
alreadyClosing := p.closed.Swap(true)
|
||||
p.sendQueueClosed.Store(true)
|
||||
|
||||
@@ -872,7 +861,6 @@ func (p *Peer) Close() error {
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(2 * time.Second):
|
||||
log.Println("Wait timeout")
|
||||
}
|
||||
|
||||
if p.dc != nil {
|
||||
@@ -925,7 +913,7 @@ func (p *Peer) sendWSPing() bool {
|
||||
defer p.wsMu.Unlock()
|
||||
if p.ws != nil {
|
||||
if err := p.ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)); err != nil {
|
||||
log.Printf("WS Ping error: %v", err)
|
||||
logger.Debugf("ws ping error: %v", err)
|
||||
p.queueReconnect()
|
||||
return false
|
||||
}
|
||||
@@ -941,7 +929,7 @@ func (p *Peer) sendAppPing() bool {
|
||||
"uid": uuid.New().String(),
|
||||
"ping": map[string]interface{}{},
|
||||
}); err != nil {
|
||||
log.Printf("App Ping error: %v", err)
|
||||
logger.Debugf("app ping error: %v", err)
|
||||
p.queueReconnect()
|
||||
return false
|
||||
}
|
||||
@@ -1019,7 +1007,6 @@ func (p *Peer) WatchConnection(ctx context.Context) { //nolint:revive,cyclop
|
||||
p.lastReconnect = time.Now()
|
||||
|
||||
if p.reconnectCount > maxReconnects {
|
||||
log.Printf("Max reconnects reached (%d)", maxReconnects)
|
||||
p.signalEnded("reconnect limit reached")
|
||||
return
|
||||
}
|
||||
@@ -1031,7 +1018,7 @@ func (p *Peer) WatchConnection(ctx context.Context) { //nolint:revive,cyclop
|
||||
|
||||
for {
|
||||
if err := p.reconnect(ctx); err != nil {
|
||||
log.Printf("Reconnect failed: %v", err)
|
||||
logger.Debugf("reconnect failed: %v", err)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
@@ -1056,8 +1043,7 @@ func (p *Peer) processSendQueue(workerID int, sessionCloseCh <-chan struct{}) {
|
||||
return
|
||||
case data := <-p.sendQueue:
|
||||
if len(data) > p.trafficShape.MaxMessageSize {
|
||||
log.Printf("[WORKER-%d] Refusing oversized message size=%d limit=%d",
|
||||
workerID, len(data), p.trafficShape.MaxMessageSize)
|
||||
logger.Debugf("oversized message size=%d limit=%d", len(data), p.trafficShape.MaxMessageSize)
|
||||
continue
|
||||
}
|
||||
|
||||
@@ -1070,7 +1056,7 @@ func (p *Peer) processSendQueue(workerID int, sessionCloseCh <-chan struct{}) {
|
||||
}
|
||||
|
||||
if err := p.dc.Send(data); err != nil {
|
||||
log.Printf("[WORKER-%d] Send error: %v", workerID, err)
|
||||
logger.Debugf("send error: %v", err)
|
||||
p.queueReconnect()
|
||||
return
|
||||
}
|
||||
@@ -1092,7 +1078,7 @@ func (p *Peer) waitBufferedAmount(workerID int, sessionCloseCh <-chan struct{})
|
||||
return 0, ErrPeerClosed
|
||||
case <-time.After(10 * time.Millisecond):
|
||||
if time.Since(start) > 5*time.Second {
|
||||
log.Printf("[WORKER-%d] Buffer wait timeout", workerID)
|
||||
logger.Debugf("buffer wait timeout worker=%d", workerID)
|
||||
return time.Since(start), nil
|
||||
}
|
||||
}
|
||||
@@ -1124,8 +1110,7 @@ func (p *Peer) monitorQueue(sessionCloseCh <-chan struct{}) {
|
||||
queueLen := len(p.sendQueue)
|
||||
buffered := p.dc.BufferedAmount()
|
||||
if queueLen > 100 || buffered > 1024*1024 {
|
||||
log.Printf("[MONITOR] queue=%d, buffered=%d MB",
|
||||
queueLen, buffered/(1024*1024))
|
||||
log.Printf("queue=%d buf=%dMB", queueLen, buffered/(1024*1024))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user