From b0aee57aa5aa8a34d0b2769a18b319d13107d1de Mon Sep 17 00:00:00 2001 From: cyber-debug Date: Sat, 16 May 2026 00:53:00 +0300 Subject: [PATCH] feat: track failover supervisor status --- cmd/olcrtc/main.go | 30 ++++++ docs/configuration.md | 4 + docs/project-map.md | 3 +- internal/supervisor/supervisor.go | 133 +++++++++++++++++++++++++ internal/supervisor/supervisor_test.go | 85 ++++++++++++++++ 5 files changed, 254 insertions(+), 1 deletion(-) diff --git a/cmd/olcrtc/main.go b/cmd/olcrtc/main.go index af7b87f..45662af 100644 --- a/cmd/olcrtc/main.go +++ b/cmd/olcrtc/main.go @@ -214,10 +214,40 @@ func runFailoverSessionMode(dataDir string, profiles []supervisor.Profile, failo } logger.Warnf("failover cycle=%d profile=%s ended", cycle, profile.Name) }, + OnStatus: logFailoverStatus, }, runSession) }) } +func logFailoverStatus(status supervisor.Status) { + if !logger.IsVerbose() { + return + } + active := status.ActiveProfile + if active == "" { + active = "none" + } + logger.Debugf("failover status cycle=%d active=%s last_error=%q profiles=%s history=%d", + status.Cycle, active, status.LastError, formatProfileStatuses(status.Profiles), len(status.History)) +} + +func formatProfileStatuses(profiles []supervisor.ProfileStatus) string { + if len(profiles) == 0 { + return "[]" + } + var buf bytes.Buffer + buf.WriteByte('[') + for i, profile := range profiles { + if i > 0 { + buf.WriteByte(' ') + } + fmt.Fprintf(&buf, "%s{starts=%d failures=%d clean=%d}", + profile.Name, profile.Starts, profile.Failures, profile.CleanEnds) + } + buf.WriteByte(']') + return buf.String() +} + func prepareRuntimeData(dataDir string) error { if dataDir == "" { return ErrDataDirRequired diff --git a/docs/configuration.md b/docs/configuration.md index 41bdeaa..52123f1 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -127,3 +127,7 @@ failover: Both peers must use compatible profile order and room settings. This first failover layer rebuilds the session on the next profile; active smux streams do not migrate, but new connections can recover on the next profile. + +When `debug: true` is enabled, the CLI also emits a compact supervisor status +snapshot with the active profile, per-profile start/failure counters, and +bounded failover history size. diff --git a/docs/project-map.md b/docs/project-map.md index 55fd291..0b09cc3 100644 --- a/docs/project-map.md +++ b/docs/project-map.md @@ -305,13 +305,14 @@ Implemented: - `failover.max_cycles`. - Profile start/end logs. - Planned session rotation with `lifecycle.max_session_duration`. +- Shared supervisor status snapshots with bounded failover history. Still valuable: - Health scoring per profile. - Control-stream coordination before switching. - Stream draining and migration instead of dropping active smux streams. -- Shared status output for the active profile and failover history. +- User-facing status endpoint/export for the active profile and failover history. Likely files: diff --git a/internal/supervisor/supervisor.go b/internal/supervisor/supervisor.go index 929fed6..293a4eb 100644 --- a/internal/supervisor/supervisor.go +++ b/internal/supervisor/supervisor.go @@ -11,6 +11,14 @@ import ( ) const DefaultRetryDelay = 2 * time.Second +const DefaultHistoryLimit = 20 + +const ( + // EventProfileStart marks a profile attempt starting. + EventProfileStart = "profile_start" + // EventProfileEnd marks a profile attempt ending. + EventProfileEnd = "profile_end" +) var ( // ErrNoProfiles is returned when the supervisor is started without profiles. @@ -25,6 +33,36 @@ type Profile struct { Config session.Config } +// ProfileStatus summarizes one profile's failover history. +type ProfileStatus struct { + Name string + Starts int + Failures int + CleanEnds int + LastStarted time.Time + LastEnded time.Time + LastError string +} + +// Event is one bounded failover history entry. +type Event struct { + Time time.Time + Type string + Profile string + Cycle int + Error string +} + +// Status is a point-in-time view of the supervisor. +type Status struct { + Cycle int + ActiveProfile string + ActiveProfileIndex int + Profiles []ProfileStatus + History []Event + LastError string +} + // Runner starts one session profile and blocks until it ends or fails. type Runner func(ctx context.Context, cfg session.Config) error @@ -36,6 +74,8 @@ type Config struct { OnProfileStart func(profile Profile, cycle int) OnProfileEnd func(profile Profile, cycle int, err error) + OnStatus func(status Status) + HistoryLimit int } // Run starts profiles in order. If a profile exits while ctx is still active, @@ -47,6 +87,7 @@ func Run(ctx context.Context, cfg Config, run Runner) error { if cfg.RetryDelay == 0 { cfg.RetryDelay = DefaultRetryDelay } + state := newStatusTracker(cfg.Profiles, cfg.HistoryLimit, cfg.OnStatus) var lastErr error for cycle := 1; ; cycle++ { @@ -54,6 +95,7 @@ func Run(ctx context.Context, cfg Config, run Runner) error { if ctx.Err() != nil { return nil } + state.start(i, cycle) if cfg.OnProfileStart != nil { cfg.OnProfileStart(profile, cycle) } @@ -67,6 +109,7 @@ func Run(ctx context.Context, cfg Config, run Runner) error { } else { lastErr = fmt.Errorf("profile %q ended", profile.Name) } + state.end(i, cycle, err) if cfg.OnProfileEnd != nil { cfg.OnProfileEnd(profile, cycle, err) } @@ -81,6 +124,96 @@ func Run(ctx context.Context, cfg Config, run Runner) error { } } +type statusTracker struct { + status Status + notify func(Status) + historyLimit int +} + +func newStatusTracker(profiles []Profile, historyLimit int, notify func(Status)) *statusTracker { + if historyLimit == 0 { + historyLimit = DefaultHistoryLimit + } + statusProfiles := make([]ProfileStatus, 0, len(profiles)) + for _, profile := range profiles { + statusProfiles = append(statusProfiles, ProfileStatus{Name: profile.Name}) + } + return &statusTracker{ + status: Status{ + ActiveProfileIndex: -1, + Profiles: statusProfiles, + }, + notify: notify, + historyLimit: historyLimit, + } +} + +func (t *statusTracker) start(profileIndex, cycle int) { + now := time.Now() + profile := &t.status.Profiles[profileIndex] + profile.Starts++ + profile.LastStarted = now + t.status.Cycle = cycle + t.status.ActiveProfile = profile.Name + t.status.ActiveProfileIndex = profileIndex + t.appendHistory(Event{ + Time: now, + Type: EventProfileStart, + Profile: profile.Name, + Cycle: cycle, + }) + t.emit() +} + +func (t *statusTracker) end(profileIndex, cycle int, err error) { + now := time.Now() + profile := &t.status.Profiles[profileIndex] + profile.LastEnded = now + event := Event{ + Time: now, + Type: EventProfileEnd, + Profile: profile.Name, + Cycle: cycle, + } + if err != nil { + profile.Failures++ + profile.LastError = err.Error() + t.status.LastError = fmt.Sprintf("profile %q: %v", profile.Name, err) + event.Error = err.Error() + } else { + profile.CleanEnds++ + profile.LastError = "" + t.status.LastError = fmt.Sprintf("profile %q ended", profile.Name) + } + t.status.ActiveProfile = "" + t.status.ActiveProfileIndex = -1 + t.appendHistory(event) + t.emit() +} + +func (t *statusTracker) appendHistory(event Event) { + if t.historyLimit < 0 { + return + } + t.status.History = append(t.status.History, event) + if len(t.status.History) > t.historyLimit { + t.status.History = t.status.History[len(t.status.History)-t.historyLimit:] + } +} + +func (t *statusTracker) emit() { + if t.notify == nil { + return + } + t.notify(cloneStatus(t.status)) +} + +func cloneStatus(status Status) Status { + status.Profiles = append([]ProfileStatus(nil), status.Profiles...) + status.History = append([]Event(nil), status.History...) + return status +} + func waitRetryDelay(ctx context.Context, delay time.Duration) error { if delay <= 0 { return nil diff --git a/internal/supervisor/supervisor_test.go b/internal/supervisor/supervisor_test.go index aab0dee..253d310 100644 --- a/internal/supervisor/supervisor_test.go +++ b/internal/supervisor/supervisor_test.go @@ -58,6 +58,91 @@ func TestRunAdvancesProfilesAndStopsAtMaxCycles(t *testing.T) { } } +func TestRunEmitsStatusHistory(t *testing.T) { + profiles := []Profile{ + {Name: "first", Config: session.Config{Auth: "wbstream"}}, + {Name: "second", Config: session.Config{Auth: "jitsi"}}, + } + var snapshots []Status + err := Run(context.Background(), Config{ + Profiles: profiles, + RetryDelay: -1, + MaxCycles: 1, + HistoryLimit: 3, + OnStatus: func(status Status) { + snapshots = append(snapshots, status) + }, + }, func(_ context.Context, cfg session.Config) error { + if cfg.Auth == "first" { + t.Fatal("runner received profile name instead of config") + } + return errRunnerBoom + }) + if !errors.Is(err, ErrMaxCyclesExceeded) { + t.Fatalf("Run() error = %v, want %v", err, ErrMaxCyclesExceeded) + } + if len(snapshots) != 4 { + t.Fatalf("status snapshots = %d, want 4", len(snapshots)) + } + first := snapshots[0] + if first.ActiveProfile != "first" || first.ActiveProfileIndex != 0 || first.Cycle != 1 { + t.Fatalf("first status = %+v", first) + } + if first.Profiles[0].Starts != 1 || first.Profiles[0].LastStarted.IsZero() { + t.Fatalf("first profile start status = %+v", first.Profiles[0]) + } + last := snapshots[len(snapshots)-1] + if last.ActiveProfile != "" || last.ActiveProfileIndex != -1 { + t.Fatalf("last active status = %+v", last) + } + if last.Profiles[0].Failures != 1 || last.Profiles[1].Failures != 1 { + t.Fatalf("profile failures = %+v", last.Profiles) + } + if last.LastError == "" || last.Profiles[1].LastError == "" { + t.Fatalf("last errors missing: %+v", last) + } + if len(last.History) != 3 { + t.Fatalf("history length = %d, want 3", len(last.History)) + } + if last.History[0].Type != EventProfileEnd || last.History[0].Profile != "first" { + t.Fatalf("oldest bounded history event = %+v", last.History[0]) + } + if last.History[2].Type != EventProfileEnd || last.History[2].Profile != "second" || + last.History[2].Error == "" { + t.Fatalf("last history event = %+v", last.History[2]) + } +} + +func TestRunStatusSnapshotIsImmutable(t *testing.T) { + var first Status + var second Status + err := Run(context.Background(), Config{ + Profiles: []Profile{{Name: "one"}}, + RetryDelay: -1, + MaxCycles: 1, + OnStatus: func(status Status) { + if first.Profiles == nil { + first = status + first.Profiles[0].Starts = 99 + first.History[0].Profile = "mutated" + return + } + second = status + }, + }, func(context.Context, session.Config) error { + return errRunnerBoom + }) + if !errors.Is(err, ErrMaxCyclesExceeded) { + t.Fatalf("Run() error = %v, want %v", err, ErrMaxCyclesExceeded) + } + if first.Profiles[0].Starts != 99 || first.History[0].Profile != "mutated" { + t.Fatalf("test mutation did not apply to snapshot: %+v", first) + } + if second.Profiles[0].Starts != 1 || second.History[0].Profile != "one" { + t.Fatalf("snapshot mutation leaked into later status: %+v", second) + } +} + func TestRunReturnsNilOnContextCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) err := Run(ctx, Config{