From 4c6bd2b838077c4686a2bff627e9d3649aff8eb6 Mon Sep 17 00:00:00 2001 From: cyber-debug Date: Sat, 16 May 2026 00:34:39 +0300 Subject: [PATCH] feat: expose control health status --- docs/project-map.md | 7 ++- internal/client/client.go | 90 ++++++++++++++++++++++++++++++-- internal/client/client_test.go | 26 +++++++++ internal/control/control.go | 24 ++++++++- internal/control/control_test.go | 16 ++++-- internal/server/server.go | 87 +++++++++++++++++++++++++++++- internal/server/server_test.go | 26 +++++++++ 7 files changed, 266 insertions(+), 10 deletions(-) diff --git a/docs/project-map.md b/docs/project-map.md index e1b2134..4481982 100644 --- a/docs/project-map.md +++ b/docs/project-map.md @@ -164,6 +164,11 @@ Defaults are `liveness.interval: 10s`, `liveness.timeout: 5s`, and `liveness.failures: 3`. Missed pongs mark the smux session unhealthy and trigger a session rebuild/reconnect path. +Client and server runtimes also maintain a `control.Status` snapshot with +session ID, last pong time, RTT, missed pongs, reconnect count, and unhealthy +event count. Embedders can consume it through the client/server health +callbacks. + ## Registries And Plugin Shape The universal-carrier refactor centers on small registries: @@ -339,7 +344,7 @@ still the natural place for: - Server policy updates. - Graceful reconnect notifications. - Drain/start markers for failover. -- Per-session stats. +- More per-session stats. Likely files: diff --git a/internal/client/client.go b/internal/client/client.go index 13be135..001cb4c 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -58,6 +58,9 @@ type Client struct { controlStop context.CancelFunc sessMu sync.RWMutex reconnectMu sync.Mutex + healthMu sync.RWMutex + health control.Status + onHealth HealthFunc deviceID string sessionID string claims map[string]any @@ -66,6 +69,9 @@ type Client struct { socksPass string } +// HealthFunc is called when the client control health snapshot changes. +type HealthFunc func(control.Status) + // Config holds runtime configuration for [Run] and [RunWithReady]. type Config struct { Link string @@ -110,6 +116,9 @@ type Config struct { // Claims is sent to the server in CLIENT_HELLO and forwarded verbatim to // the server's AuthHook. Free-form key/value bag for plan, user, region, etc. Claims map[string]any + + // OnHealth receives liveness/reconnect status updates. Nil means no-op. + OnHealth HealthFunc } // Run starts the client with the given configuration. @@ -139,6 +148,7 @@ func RunWithReady(ctx context.Context, cfg Config, onReady func()) error { dnsServer: cfg.DNSServer, socksUser: cfg.SOCKSUser, socksPass: cfg.SOCKSPass, + onHealth: cfg.OnHealth, } // shutdown is registered BEFORE bringUpLink so we always close any @@ -221,7 +231,7 @@ func (c *Client) bringUpLink( if ctx.Err() != nil { return } - if !c.handleReconnect(ctx, cfg, cancel) { + if !c.handleReconnect(ctx, cfg, cancel, "carrier") { cancel() } }) @@ -249,6 +259,7 @@ func (c *Client) bringUpLink( c.controlStrm = control c.sessionID = sid c.sessMu.Unlock() + c.recordSession(sid) c.startControlLoop(ctx, cfg, cancel, control) go ln.WatchConnection(ctx) @@ -333,11 +344,12 @@ func smuxConfig() *smux.Config { return cfg } -func (c *Client) handleReconnect(ctx context.Context, cfg Config, cancel context.CancelFunc) bool { +func (c *Client) handleReconnect(ctx context.Context, cfg Config, cancel context.CancelFunc, reason string) bool { c.reconnectMu.Lock() defer c.reconnectMu.Unlock() - logger.Infof("client link reconnect - tearing down smux session") + c.recordReconnect() + logger.Infof("client reconnect reason=%s - tearing down smux session", reason) // Install a fresh muxconn immediately so onData never hits nil while // the old session is being torn down. tryReopenSession will swap it @@ -379,6 +391,7 @@ func (c *Client) handleReconnect(ctx context.Context, cfg Config, cancel context attemptDelay = 300 * time.Millisecond ) for attempt := 1; attempt <= maxAttempts; attempt++ { + logger.Infof("client reconnect attempt=%d reason=%s", attempt, reason) if c.tryReopenSession(ctx, cfg, cancel, attempt) { return true } @@ -425,6 +438,7 @@ func (c *Client) tryReopenSession( c.controlStrm = control c.sessionID = sid c.sessMu.Unlock() + c.recordSession(sid) c.startControlLoop(ctx, cfg, cancel, control) return true } @@ -442,17 +456,27 @@ func (c *Client) startControlLoop( liveness := cfg.Liveness onPong := liveness.OnPong + onMissedPong := liveness.OnMissedPong onUnhealthy := liveness.OnUnhealthy liveness.OnPong = func(h control.Health) { c.sessMu.RLock() sid := c.sessionID c.sessMu.RUnlock() + c.recordPong(h) logger.Debugf("control alive session=%s rtt=%v seq=%d", sid, h.RTT, h.Seq) if onPong != nil { onPong(h) } } + liveness.OnMissedPong = func(missed int) { + c.recordMissed(missed) + logger.Warnf("control missed pong on client: missed_pongs=%d", missed) + if onMissedPong != nil { + onMissedPong(missed) + } + } liveness.OnUnhealthy = func(missed int) { + c.recordUnhealthy(missed) logger.Warnf("control stream unhealthy on client: missed_pongs=%d", missed) if onUnhealthy != nil { onUnhealthy(missed) @@ -467,12 +491,70 @@ func (c *Client) startControlLoop( if err != nil { logger.Warnf("client control stream ended: %v", err) } - if !c.handleReconnect(ctx, cfg, cancel) { + if !c.handleReconnect(ctx, cfg, cancel, "liveness") { cancel() } }() } +// Status returns the latest client-side control health snapshot. +func (c *Client) Status() control.Status { + c.healthMu.RLock() + defer c.healthMu.RUnlock() + return c.health +} + +func (c *Client) recordSession(sessionID string) { + c.healthMu.Lock() + c.health.SessionID = sessionID + c.health.MissedPongs = 0 + status := c.health + c.healthMu.Unlock() + c.notifyHealth(status) +} + +func (c *Client) recordPong(h control.Health) { + c.healthMu.Lock() + c.health.LastPong = h.LastSeen + c.health.LastRTT = h.RTT + c.health.MissedPongs = 0 + status := c.health + c.healthMu.Unlock() + c.notifyHealth(status) +} + +func (c *Client) recordMissed(missed int) { + c.healthMu.Lock() + c.health.MissedPongs = missed + status := c.health + c.healthMu.Unlock() + c.notifyHealth(status) +} + +func (c *Client) recordUnhealthy(missed int) { + c.healthMu.Lock() + c.health.MissedPongs = missed + c.health.UnhealthyEvents++ + c.health.LastUnhealthy = time.Now() + status := c.health + c.healthMu.Unlock() + c.notifyHealth(status) +} + +func (c *Client) recordReconnect() { + c.healthMu.Lock() + c.health.Reconnects++ + status := c.health + c.healthMu.Unlock() + c.notifyHealth(status) +} + +func (c *Client) notifyHealth(status control.Status) { + if c.onHealth != nil { + c.onHealth(status) + } +} + func (c *Client) shutdown() { c.sessMu.Lock() control := c.controlStrm diff --git a/internal/client/client_test.go b/internal/client/client_test.go index f5d836b..82d0099 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -555,6 +555,7 @@ func TestStartControlLoopReportsPong(t *testing.T) { defer cancel() got := make(chan control.Health, 1) c := &Client{sessionID: "sid-control"} + c.recordSession("sid-control") c.startControlLoop(ctx, Config{ Liveness: control.Config{ Interval: 10 * time.Millisecond, @@ -584,4 +585,29 @@ func TestStartControlLoopReportsPong(t *testing.T) { case <-time.After(time.Second): t.Fatal("timed out waiting for control pong") } + status := c.Status() + if status.SessionID != "sid-control" { + t.Fatalf("Status.SessionID = %q, want sid-control", status.SessionID) + } + if status.LastPong.IsZero() || status.LastRTT < 0 || status.MissedPongs != 0 { + t.Fatalf("Status() = %+v", status) + } +} + +func TestStatusRecordsReconnectAndUnhealthy(t *testing.T) { + updates := 0 + c := &Client{onHealth: func(control.Status) { updates++ }} + c.recordSession("sid-1") + c.recordMissed(2) + c.recordUnhealthy(3) + c.recordReconnect() + + status := c.Status() + if status.SessionID != "sid-1" || status.MissedPongs != 3 || + status.UnhealthyEvents != 1 || status.Reconnects != 1 || status.LastUnhealthy.IsZero() { + t.Fatalf("Status() = %+v", status) + } + if updates != 4 { + t.Fatalf("health updates = %d, want 4", updates) + } } diff --git a/internal/control/control.go b/internal/control/control.go index a6bd50f..d799518 100644 --- a/internal/control/control.go +++ b/internal/control/control.go @@ -71,6 +71,18 @@ type Health struct { LastSeen time.Time } +// Status is a point-in-time view of control-stream health maintained by +// callers that embed the control loop. +type Status struct { + SessionID string + LastPong time.Time + LastRTT time.Duration + MissedPongs int + Reconnects uint64 + UnhealthyEvents uint64 + LastUnhealthy time.Time +} + // Config controls the liveness loop. type Config struct { Interval time.Duration @@ -79,6 +91,8 @@ type Config struct { // OnPong is called after a matching pong is received. OnPong func(Health) + // OnMissedPong is called when one or more outstanding pongs time out. + OnMissedPong func(missed int) // OnUnhealthy is called before Run returns [ErrUnhealthy]. OnUnhealthy func(missed int) } @@ -195,16 +209,21 @@ func (s *state) sendProbe(ctx context.Context) error { now := s.now() s.mu.Lock() + missedNow := 0 for seq, sent := range s.pending { if now.Sub(sent) < s.cfg.Timeout { continue } delete(s.pending, seq) s.failures++ + missedNow++ } + missed := s.failures if s.failures >= s.cfg.Failures { - missed := s.failures s.mu.Unlock() + if missedNow > 0 && s.cfg.OnMissedPong != nil { + s.cfg.OnMissedPong(missed) + } if s.cfg.OnUnhealthy != nil { s.cfg.OnUnhealthy(missed) } @@ -215,6 +234,9 @@ func (s *state) sendProbe(ctx context.Context) error { seq := s.nextSeq s.pending[seq] = now s.mu.Unlock() + if missedNow > 0 && s.cfg.OnMissedPong != nil { + s.cfg.OnMissedPong(missed) + } return s.enqueue(ctx, Message{ Version: ProtoVersion, diff --git a/internal/control/control_test.go b/internal/control/control_test.go index 3c52bf6..8700027 100644 --- a/internal/control/control_test.go +++ b/internal/control/control_test.go @@ -71,12 +71,19 @@ func TestRunMarksUnhealthyAfterMissedPongs(t *testing.T) { }() missedCh := make(chan int, 1) + missedCallbackCh := make(chan int, 1) errCh := make(chan error, 1) go func() { errCh <- Run(ctx, a, Config{ - Interval: 10 * time.Millisecond, - Timeout: 5 * time.Millisecond, - Failures: 2, + Interval: 10 * time.Millisecond, + Timeout: 5 * time.Millisecond, + Failures: 2, + OnMissedPong: func(missed int) { + select { + case missedCallbackCh <- missed: + default: + } + }, OnUnhealthy: func(missed int) { missedCh <- missed }, }) }() @@ -92,6 +99,9 @@ func TestRunMarksUnhealthyAfterMissedPongs(t *testing.T) { if missed := <-missedCh; missed < 2 { t.Fatalf("missed = %d, want >= 2", missed) } + if missed := <-missedCallbackCh; missed < 1 { + t.Fatalf("missed callback = %d, want >= 1", missed) + } } func TestRunRejectsBadProtocolVersion(t *testing.T) { diff --git a/internal/server/server.go b/internal/server/server.go index 4954ad4..7dae4eb 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -50,6 +50,9 @@ type SessionCloseFunc func(sessionID, reason string) // bytesIn counts client→target bytes; bytesOut counts target→client bytes. type TrafficFunc func(sessionID, addr string, bytesIn, bytesOut uint64) +// HealthFunc is called when the server control health snapshot changes. +type HealthFunc func(control.Status) + // Server handles incoming tunnel connections and proxies their traffic. type Server struct { ln link.Link @@ -59,11 +62,13 @@ type Server struct { controlStop context.CancelFunc sessMu sync.RWMutex reinstallMu sync.Mutex + healthMu sync.RWMutex wg sync.WaitGroup authHook handshake.AuthFunc onOpen SessionOpenFunc onClose SessionCloseFunc onTraffic TrafficFunc + onHealth HealthFunc deviceID string sessionID string dnsServer string @@ -71,6 +76,7 @@ type Server struct { socksProxyAddr string socksProxyPort int liveness control.Config + health control.Status } // ConnectRequest is a message from the client to establish a new connection. @@ -121,6 +127,8 @@ type Config struct { OnSessionClose SessionCloseFunc // OnTraffic fires once per tunnel stream after both copy loops finish. Nil means no-op. OnTraffic TrafficFunc + // OnHealth fires when liveness/reconnect status changes. Nil means no-op. + OnHealth HealthFunc } // Run starts the server with the given configuration. @@ -149,6 +157,10 @@ func Run(ctx context.Context, cfg Config) error { if onTraffic == nil { onTraffic = func(string, string, uint64, uint64) {} } + onHealth := cfg.OnHealth + if onHealth == nil { + onHealth = func(control.Status) {} + } s := &Server{ cipher: cipher, @@ -156,6 +168,7 @@ func Run(ctx context.Context, cfg Config) error { onOpen: onOpen, onClose: onClose, onTraffic: onTraffic, + onHealth: onHealth, dnsServer: cfg.DNSServer, socksProxyAddr: cfg.SOCKSProxyAddr, socksProxyPort: cfg.SOCKSProxyPort, @@ -315,7 +328,8 @@ func (s *Server) installSession() { } func (s *Server) handleReconnect() { - logger.Infof("server link reconnect - tearing down smux session") + s.recordReconnect() + logger.Infof("server reconnect reason=carrier - tearing down smux session") s.sessMu.RLock() current := s.session s.sessMu.RUnlock() @@ -491,6 +505,7 @@ func (s *Server) acceptHandshake(ctx context.Context, sess *smux.Session) bool { s.deviceID = hello.DeviceID s.sessionID = sid s.sessMu.Unlock() + s.recordSession(sid) s.onOpen(sid, hello.DeviceID, hello.Claims) logger.Infof("session %s opened (device=%s)", sid, hello.DeviceID) s.startControlLoop(ctx, sess, stream) @@ -505,17 +520,27 @@ func (s *Server) startControlLoop(ctx context.Context, sess *smux.Session, strea liveness := s.liveness onPong := liveness.OnPong + onMissedPong := liveness.OnMissedPong onUnhealthy := liveness.OnUnhealthy liveness.OnPong = func(h control.Health) { s.sessMu.RLock() sid := s.sessionID s.sessMu.RUnlock() + s.recordPong(h) logger.Debugf("control alive session=%s rtt=%v seq=%d", sid, h.RTT, h.Seq) if onPong != nil { onPong(h) } } + liveness.OnMissedPong = func(missed int) { + s.recordMissed(missed) + logger.Warnf("control missed pong on server: missed_pongs=%d", missed) + if onMissedPong != nil { + onMissedPong(missed) + } + } liveness.OnUnhealthy = func(missed int) { + s.recordUnhealthy(missed) logger.Warnf("control stream unhealthy on server: missed_pongs=%d", missed) if onUnhealthy != nil { onUnhealthy(missed) @@ -533,10 +558,70 @@ func (s *Server) startControlLoop(ctx context.Context, sess *smux.Session, strea if err != nil { logger.Warnf("server control stream ended: %v", err) } + s.recordReconnect() + logger.Infof("server reconnect reason=liveness - reinstalling smux session") s.reinstallSession(sess) }() } +// Status returns the latest server-side control health snapshot. +func (s *Server) Status() control.Status { + s.healthMu.RLock() + defer s.healthMu.RUnlock() + return s.health +} + +func (s *Server) recordSession(sessionID string) { + s.healthMu.Lock() + s.health.SessionID = sessionID + s.health.MissedPongs = 0 + status := s.health + s.healthMu.Unlock() + s.notifyHealth(status) +} + +func (s *Server) recordPong(h control.Health) { + s.healthMu.Lock() + s.health.LastPong = h.LastSeen + s.health.LastRTT = h.RTT + s.health.MissedPongs = 0 + status := s.health + s.healthMu.Unlock() + s.notifyHealth(status) +} + +func (s *Server) recordMissed(missed int) { + s.healthMu.Lock() + s.health.MissedPongs = missed + status := s.health + s.healthMu.Unlock() + s.notifyHealth(status) +} + +func (s *Server) recordUnhealthy(missed int) { + s.healthMu.Lock() + s.health.MissedPongs = missed + s.health.UnhealthyEvents++ + s.health.LastUnhealthy = time.Now() + status := s.health + s.healthMu.Unlock() + s.notifyHealth(status) +} + +func (s *Server) recordReconnect() { + s.healthMu.Lock() + s.health.Reconnects++ + status := s.health + s.healthMu.Unlock() + s.notifyHealth(status) +} + +func (s *Server) notifyHealth(status control.Status) { + if s.onHealth != nil { + s.onHealth(status) + } +} + func (s *Server) shutdown() { s.closeSession() if s.ln != nil { diff --git a/internal/server/server_test.go b/internal/server/server_test.go index d5a6f6d..dc80b21 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -422,6 +422,7 @@ func TestStartControlLoopReportsPong(t *testing.T) { }, }, } + s.recordSession("sid-control") defer func() { cancel() s.wg.Wait() @@ -443,6 +444,31 @@ func TestStartControlLoopReportsPong(t *testing.T) { case <-time.After(time.Second): t.Fatal("timed out waiting for control pong") } + status := s.Status() + if status.SessionID != "sid-control" { + t.Fatalf("Status.SessionID = %q, want sid-control", status.SessionID) + } + if status.LastPong.IsZero() || status.LastRTT < 0 || status.MissedPongs != 0 { + t.Fatalf("Status() = %+v", status) + } +} + +func TestStatusRecordsReconnectAndUnhealthy(t *testing.T) { + updates := 0 + s := &Server{onHealth: func(control.Status) { updates++ }} + s.recordSession("sid-1") + s.recordMissed(2) + s.recordUnhealthy(3) + s.recordReconnect() + + status := s.Status() + if status.SessionID != "sid-1" || status.MissedPongs != 3 || + status.UnhealthyEvents != 1 || status.Reconnects != 1 || status.LastUnhealthy.IsZero() { + t.Fatalf("Status() = %+v", status) + } + if updates != 4 { + t.Fatalf("health updates = %d, want 4", updates) + } } //nolint:cyclop // integration-style test needs setup, proxying, and traffic assertions together.