feat: expose control health status

This commit is contained in:
cyber-debug
2026-05-16 00:34:39 +03:00
parent d16cd0686a
commit 4c6bd2b838
7 changed files with 266 additions and 10 deletions

View File

@@ -164,6 +164,11 @@ Defaults are `liveness.interval: 10s`, `liveness.timeout: 5s`, and
`liveness.failures: 3`. Missed pongs mark the smux session unhealthy and
trigger a session rebuild/reconnect path.
Client and server runtimes also maintain a `control.Status` snapshot with
session ID, last pong time, RTT, missed pongs, reconnect count, and unhealthy
event count. Embedders can consume it through the client/server health
callbacks.
## Registries And Plugin Shape
The universal-carrier refactor centers on small registries:
@@ -339,7 +344,7 @@ still the natural place for:
- Server policy updates.
- Graceful reconnect notifications.
- Drain/start markers for failover.
- Per-session stats.
- More per-session stats.
Likely files:

View File

@@ -58,6 +58,9 @@ type Client struct {
controlStop context.CancelFunc
sessMu sync.RWMutex
reconnectMu sync.Mutex
healthMu sync.RWMutex
health control.Status
onHealth HealthFunc
deviceID string
sessionID string
claims map[string]any
@@ -66,6 +69,9 @@ type Client struct {
socksPass string
}
// HealthFunc is called when the client control health snapshot changes.
type HealthFunc func(control.Status)
// Config holds runtime configuration for [Run] and [RunWithReady].
type Config struct {
Link string
@@ -110,6 +116,9 @@ type Config struct {
// Claims is sent to the server in CLIENT_HELLO and forwarded verbatim to
// the server's AuthHook. Free-form key/value bag for plan, user, region, etc.
Claims map[string]any
// OnHealth receives liveness/reconnect status updates. Nil means no-op.
OnHealth HealthFunc
}
// Run starts the client with the given configuration.
@@ -139,6 +148,7 @@ func RunWithReady(ctx context.Context, cfg Config, onReady func()) error {
dnsServer: cfg.DNSServer,
socksUser: cfg.SOCKSUser,
socksPass: cfg.SOCKSPass,
onHealth: cfg.OnHealth,
}
// shutdown is registered BEFORE bringUpLink so we always close any
@@ -221,7 +231,7 @@ func (c *Client) bringUpLink(
if ctx.Err() != nil {
return
}
if !c.handleReconnect(ctx, cfg, cancel) {
if !c.handleReconnect(ctx, cfg, cancel, "carrier") {
cancel()
}
})
@@ -249,6 +259,7 @@ func (c *Client) bringUpLink(
c.controlStrm = control
c.sessionID = sid
c.sessMu.Unlock()
c.recordSession(sid)
c.startControlLoop(ctx, cfg, cancel, control)
go ln.WatchConnection(ctx)
@@ -333,11 +344,12 @@ func smuxConfig() *smux.Config {
return cfg
}
func (c *Client) handleReconnect(ctx context.Context, cfg Config, cancel context.CancelFunc) bool {
func (c *Client) handleReconnect(ctx context.Context, cfg Config, cancel context.CancelFunc, reason string) bool {
c.reconnectMu.Lock()
defer c.reconnectMu.Unlock()
logger.Infof("client link reconnect - tearing down smux session")
c.recordReconnect()
logger.Infof("client reconnect reason=%s - tearing down smux session", reason)
// Install a fresh muxconn immediately so onData never hits nil while
// the old session is being torn down. tryReopenSession will swap it
@@ -379,6 +391,7 @@ func (c *Client) handleReconnect(ctx context.Context, cfg Config, cancel context
attemptDelay = 300 * time.Millisecond
)
for attempt := 1; attempt <= maxAttempts; attempt++ {
logger.Infof("client reconnect attempt=%d reason=%s", attempt, reason)
if c.tryReopenSession(ctx, cfg, cancel, attempt) {
return true
}
@@ -425,6 +438,7 @@ func (c *Client) tryReopenSession(
c.controlStrm = control
c.sessionID = sid
c.sessMu.Unlock()
c.recordSession(sid)
c.startControlLoop(ctx, cfg, cancel, control)
return true
}
@@ -442,17 +456,27 @@ func (c *Client) startControlLoop(
liveness := cfg.Liveness
onPong := liveness.OnPong
onMissedPong := liveness.OnMissedPong
onUnhealthy := liveness.OnUnhealthy
liveness.OnPong = func(h control.Health) {
c.sessMu.RLock()
sid := c.sessionID
c.sessMu.RUnlock()
c.recordPong(h)
logger.Debugf("control alive session=%s rtt=%v seq=%d", sid, h.RTT, h.Seq)
if onPong != nil {
onPong(h)
}
}
liveness.OnMissedPong = func(missed int) {
c.recordMissed(missed)
logger.Warnf("control missed pong on client: missed_pongs=%d", missed)
if onMissedPong != nil {
onMissedPong(missed)
}
}
liveness.OnUnhealthy = func(missed int) {
c.recordUnhealthy(missed)
logger.Warnf("control stream unhealthy on client: missed_pongs=%d", missed)
if onUnhealthy != nil {
onUnhealthy(missed)
@@ -467,12 +491,70 @@ func (c *Client) startControlLoop(
if err != nil {
logger.Warnf("client control stream ended: %v", err)
}
if !c.handleReconnect(ctx, cfg, cancel) {
if !c.handleReconnect(ctx, cfg, cancel, "liveness") {
cancel()
}
}()
}
// Status returns the latest client-side control health snapshot.
func (c *Client) Status() control.Status {
c.healthMu.RLock()
defer c.healthMu.RUnlock()
return c.health
}
func (c *Client) recordSession(sessionID string) {
c.healthMu.Lock()
c.health.SessionID = sessionID
c.health.MissedPongs = 0
status := c.health
c.healthMu.Unlock()
c.notifyHealth(status)
}
func (c *Client) recordPong(h control.Health) {
c.healthMu.Lock()
c.health.LastPong = h.LastSeen
c.health.LastRTT = h.RTT
c.health.MissedPongs = 0
status := c.health
c.healthMu.Unlock()
c.notifyHealth(status)
}
func (c *Client) recordMissed(missed int) {
c.healthMu.Lock()
c.health.MissedPongs = missed
status := c.health
c.healthMu.Unlock()
c.notifyHealth(status)
}
func (c *Client) recordUnhealthy(missed int) {
c.healthMu.Lock()
c.health.MissedPongs = missed
c.health.UnhealthyEvents++
c.health.LastUnhealthy = time.Now()
status := c.health
c.healthMu.Unlock()
c.notifyHealth(status)
}
func (c *Client) recordReconnect() {
c.healthMu.Lock()
c.health.Reconnects++
status := c.health
c.healthMu.Unlock()
c.notifyHealth(status)
}
func (c *Client) notifyHealth(status control.Status) {
if c.onHealth != nil {
c.onHealth(status)
}
}
func (c *Client) shutdown() {
c.sessMu.Lock()
control := c.controlStrm

View File

@@ -555,6 +555,7 @@ func TestStartControlLoopReportsPong(t *testing.T) {
defer cancel()
got := make(chan control.Health, 1)
c := &Client{sessionID: "sid-control"}
c.recordSession("sid-control")
c.startControlLoop(ctx, Config{
Liveness: control.Config{
Interval: 10 * time.Millisecond,
@@ -584,4 +585,29 @@ func TestStartControlLoopReportsPong(t *testing.T) {
case <-time.After(time.Second):
t.Fatal("timed out waiting for control pong")
}
status := c.Status()
if status.SessionID != "sid-control" {
t.Fatalf("Status.SessionID = %q, want sid-control", status.SessionID)
}
if status.LastPong.IsZero() || status.LastRTT < 0 || status.MissedPongs != 0 {
t.Fatalf("Status() = %+v", status)
}
}
func TestStatusRecordsReconnectAndUnhealthy(t *testing.T) {
updates := 0
c := &Client{onHealth: func(control.Status) { updates++ }}
c.recordSession("sid-1")
c.recordMissed(2)
c.recordUnhealthy(3)
c.recordReconnect()
status := c.Status()
if status.SessionID != "sid-1" || status.MissedPongs != 3 ||
status.UnhealthyEvents != 1 || status.Reconnects != 1 || status.LastUnhealthy.IsZero() {
t.Fatalf("Status() = %+v", status)
}
if updates != 4 {
t.Fatalf("health updates = %d, want 4", updates)
}
}

View File

@@ -71,6 +71,18 @@ type Health struct {
LastSeen time.Time
}
// Status is a point-in-time view of control-stream health maintained by
// callers that embed the control loop.
type Status struct {
SessionID string
LastPong time.Time
LastRTT time.Duration
MissedPongs int
Reconnects uint64
UnhealthyEvents uint64
LastUnhealthy time.Time
}
// Config controls the liveness loop.
type Config struct {
Interval time.Duration
@@ -79,6 +91,8 @@ type Config struct {
// OnPong is called after a matching pong is received.
OnPong func(Health)
// OnMissedPong is called when one or more outstanding pongs time out.
OnMissedPong func(missed int)
// OnUnhealthy is called before Run returns [ErrUnhealthy].
OnUnhealthy func(missed int)
}
@@ -195,16 +209,21 @@ func (s *state) sendProbe(ctx context.Context) error {
now := s.now()
s.mu.Lock()
missedNow := 0
for seq, sent := range s.pending {
if now.Sub(sent) < s.cfg.Timeout {
continue
}
delete(s.pending, seq)
s.failures++
missedNow++
}
missed := s.failures
if s.failures >= s.cfg.Failures {
missed := s.failures
s.mu.Unlock()
if missedNow > 0 && s.cfg.OnMissedPong != nil {
s.cfg.OnMissedPong(missed)
}
if s.cfg.OnUnhealthy != nil {
s.cfg.OnUnhealthy(missed)
}
@@ -215,6 +234,9 @@ func (s *state) sendProbe(ctx context.Context) error {
seq := s.nextSeq
s.pending[seq] = now
s.mu.Unlock()
if missedNow > 0 && s.cfg.OnMissedPong != nil {
s.cfg.OnMissedPong(missed)
}
return s.enqueue(ctx, Message{
Version: ProtoVersion,

View File

@@ -71,12 +71,19 @@ func TestRunMarksUnhealthyAfterMissedPongs(t *testing.T) {
}()
missedCh := make(chan int, 1)
missedCallbackCh := make(chan int, 1)
errCh := make(chan error, 1)
go func() {
errCh <- Run(ctx, a, Config{
Interval: 10 * time.Millisecond,
Timeout: 5 * time.Millisecond,
Failures: 2,
Interval: 10 * time.Millisecond,
Timeout: 5 * time.Millisecond,
Failures: 2,
OnMissedPong: func(missed int) {
select {
case missedCallbackCh <- missed:
default:
}
},
OnUnhealthy: func(missed int) { missedCh <- missed },
})
}()
@@ -92,6 +99,9 @@ func TestRunMarksUnhealthyAfterMissedPongs(t *testing.T) {
if missed := <-missedCh; missed < 2 {
t.Fatalf("missed = %d, want >= 2", missed)
}
if missed := <-missedCallbackCh; missed < 1 {
t.Fatalf("missed callback = %d, want >= 1", missed)
}
}
func TestRunRejectsBadProtocolVersion(t *testing.T) {

View File

@@ -50,6 +50,9 @@ type SessionCloseFunc func(sessionID, reason string)
// bytesIn counts client→target bytes; bytesOut counts target→client bytes.
type TrafficFunc func(sessionID, addr string, bytesIn, bytesOut uint64)
// HealthFunc is called when the server control health snapshot changes.
type HealthFunc func(control.Status)
// Server handles incoming tunnel connections and proxies their traffic.
type Server struct {
ln link.Link
@@ -59,11 +62,13 @@ type Server struct {
controlStop context.CancelFunc
sessMu sync.RWMutex
reinstallMu sync.Mutex
healthMu sync.RWMutex
wg sync.WaitGroup
authHook handshake.AuthFunc
onOpen SessionOpenFunc
onClose SessionCloseFunc
onTraffic TrafficFunc
onHealth HealthFunc
deviceID string
sessionID string
dnsServer string
@@ -71,6 +76,7 @@ type Server struct {
socksProxyAddr string
socksProxyPort int
liveness control.Config
health control.Status
}
// ConnectRequest is a message from the client to establish a new connection.
@@ -121,6 +127,8 @@ type Config struct {
OnSessionClose SessionCloseFunc
// OnTraffic fires once per tunnel stream after both copy loops finish. Nil means no-op.
OnTraffic TrafficFunc
// OnHealth fires when liveness/reconnect status changes. Nil means no-op.
OnHealth HealthFunc
}
// Run starts the server with the given configuration.
@@ -149,6 +157,10 @@ func Run(ctx context.Context, cfg Config) error {
if onTraffic == nil {
onTraffic = func(string, string, uint64, uint64) {}
}
onHealth := cfg.OnHealth
if onHealth == nil {
onHealth = func(control.Status) {}
}
s := &Server{
cipher: cipher,
@@ -156,6 +168,7 @@ func Run(ctx context.Context, cfg Config) error {
onOpen: onOpen,
onClose: onClose,
onTraffic: onTraffic,
onHealth: onHealth,
dnsServer: cfg.DNSServer,
socksProxyAddr: cfg.SOCKSProxyAddr,
socksProxyPort: cfg.SOCKSProxyPort,
@@ -315,7 +328,8 @@ func (s *Server) installSession() {
}
func (s *Server) handleReconnect() {
logger.Infof("server link reconnect - tearing down smux session")
s.recordReconnect()
logger.Infof("server reconnect reason=carrier - tearing down smux session")
s.sessMu.RLock()
current := s.session
s.sessMu.RUnlock()
@@ -491,6 +505,7 @@ func (s *Server) acceptHandshake(ctx context.Context, sess *smux.Session) bool {
s.deviceID = hello.DeviceID
s.sessionID = sid
s.sessMu.Unlock()
s.recordSession(sid)
s.onOpen(sid, hello.DeviceID, hello.Claims)
logger.Infof("session %s opened (device=%s)", sid, hello.DeviceID)
s.startControlLoop(ctx, sess, stream)
@@ -505,17 +520,27 @@ func (s *Server) startControlLoop(ctx context.Context, sess *smux.Session, strea
liveness := s.liveness
onPong := liveness.OnPong
onMissedPong := liveness.OnMissedPong
onUnhealthy := liveness.OnUnhealthy
liveness.OnPong = func(h control.Health) {
s.sessMu.RLock()
sid := s.sessionID
s.sessMu.RUnlock()
s.recordPong(h)
logger.Debugf("control alive session=%s rtt=%v seq=%d", sid, h.RTT, h.Seq)
if onPong != nil {
onPong(h)
}
}
liveness.OnMissedPong = func(missed int) {
s.recordMissed(missed)
logger.Warnf("control missed pong on server: missed_pongs=%d", missed)
if onMissedPong != nil {
onMissedPong(missed)
}
}
liveness.OnUnhealthy = func(missed int) {
s.recordUnhealthy(missed)
logger.Warnf("control stream unhealthy on server: missed_pongs=%d", missed)
if onUnhealthy != nil {
onUnhealthy(missed)
@@ -533,10 +558,70 @@ func (s *Server) startControlLoop(ctx context.Context, sess *smux.Session, strea
if err != nil {
logger.Warnf("server control stream ended: %v", err)
}
s.recordReconnect()
logger.Infof("server reconnect reason=liveness - reinstalling smux session")
s.reinstallSession(sess)
}()
}
// Status returns the latest server-side control health snapshot.
func (s *Server) Status() control.Status {
s.healthMu.RLock()
defer s.healthMu.RUnlock()
return s.health
}
func (s *Server) recordSession(sessionID string) {
s.healthMu.Lock()
s.health.SessionID = sessionID
s.health.MissedPongs = 0
status := s.health
s.healthMu.Unlock()
s.notifyHealth(status)
}
func (s *Server) recordPong(h control.Health) {
s.healthMu.Lock()
s.health.LastPong = h.LastSeen
s.health.LastRTT = h.RTT
s.health.MissedPongs = 0
status := s.health
s.healthMu.Unlock()
s.notifyHealth(status)
}
func (s *Server) recordMissed(missed int) {
s.healthMu.Lock()
s.health.MissedPongs = missed
status := s.health
s.healthMu.Unlock()
s.notifyHealth(status)
}
func (s *Server) recordUnhealthy(missed int) {
s.healthMu.Lock()
s.health.MissedPongs = missed
s.health.UnhealthyEvents++
s.health.LastUnhealthy = time.Now()
status := s.health
s.healthMu.Unlock()
s.notifyHealth(status)
}
func (s *Server) recordReconnect() {
s.healthMu.Lock()
s.health.Reconnects++
status := s.health
s.healthMu.Unlock()
s.notifyHealth(status)
}
func (s *Server) notifyHealth(status control.Status) {
if s.onHealth != nil {
s.onHealth(status)
}
}
func (s *Server) shutdown() {
s.closeSession()
if s.ln != nil {

View File

@@ -422,6 +422,7 @@ func TestStartControlLoopReportsPong(t *testing.T) {
},
},
}
s.recordSession("sid-control")
defer func() {
cancel()
s.wg.Wait()
@@ -443,6 +444,31 @@ func TestStartControlLoopReportsPong(t *testing.T) {
case <-time.After(time.Second):
t.Fatal("timed out waiting for control pong")
}
status := s.Status()
if status.SessionID != "sid-control" {
t.Fatalf("Status.SessionID = %q, want sid-control", status.SessionID)
}
if status.LastPong.IsZero() || status.LastRTT < 0 || status.MissedPongs != 0 {
t.Fatalf("Status() = %+v", status)
}
}
func TestStatusRecordsReconnectAndUnhealthy(t *testing.T) {
updates := 0
s := &Server{onHealth: func(control.Status) { updates++ }}
s.recordSession("sid-1")
s.recordMissed(2)
s.recordUnhealthy(3)
s.recordReconnect()
status := s.Status()
if status.SessionID != "sid-1" || status.MissedPongs != 3 ||
status.UnhealthyEvents != 1 || status.Reconnects != 1 || status.LastUnhealthy.IsZero() {
t.Fatalf("Status() = %+v", status)
}
if updates != 4 {
t.Fatalf("health updates = %d, want 4", updates)
}
}
//nolint:cyclop // integration-style test needs setup, proxying, and traffic assertions together.