mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-05-26 07:08:11 +00:00
feat: track failover supervisor status
This commit is contained in:
@@ -214,10 +214,40 @@ func runFailoverSessionMode(dataDir string, profiles []supervisor.Profile, failo
|
||||
}
|
||||
logger.Warnf("failover cycle=%d profile=%s ended", cycle, profile.Name)
|
||||
},
|
||||
OnStatus: logFailoverStatus,
|
||||
}, runSession)
|
||||
})
|
||||
}
|
||||
|
||||
func logFailoverStatus(status supervisor.Status) {
|
||||
if !logger.IsVerbose() {
|
||||
return
|
||||
}
|
||||
active := status.ActiveProfile
|
||||
if active == "" {
|
||||
active = "none"
|
||||
}
|
||||
logger.Debugf("failover status cycle=%d active=%s last_error=%q profiles=%s history=%d",
|
||||
status.Cycle, active, status.LastError, formatProfileStatuses(status.Profiles), len(status.History))
|
||||
}
|
||||
|
||||
func formatProfileStatuses(profiles []supervisor.ProfileStatus) string {
|
||||
if len(profiles) == 0 {
|
||||
return "[]"
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
buf.WriteByte('[')
|
||||
for i, profile := range profiles {
|
||||
if i > 0 {
|
||||
buf.WriteByte(' ')
|
||||
}
|
||||
fmt.Fprintf(&buf, "%s{starts=%d failures=%d clean=%d}",
|
||||
profile.Name, profile.Starts, profile.Failures, profile.CleanEnds)
|
||||
}
|
||||
buf.WriteByte(']')
|
||||
return buf.String()
|
||||
}
|
||||
|
||||
func prepareRuntimeData(dataDir string) error {
|
||||
if dataDir == "" {
|
||||
return ErrDataDirRequired
|
||||
|
||||
@@ -127,3 +127,7 @@ failover:
|
||||
Both peers must use compatible profile order and room settings. This first
|
||||
failover layer rebuilds the session on the next profile; active smux streams
|
||||
do not migrate, but new connections can recover on the next profile.
|
||||
|
||||
When `debug: true` is enabled, the CLI also emits a compact supervisor status
|
||||
snapshot with the active profile, per-profile start/failure counters, and
|
||||
bounded failover history size.
|
||||
|
||||
@@ -305,13 +305,14 @@ Implemented:
|
||||
- `failover.max_cycles`.
|
||||
- Profile start/end logs.
|
||||
- Planned session rotation with `lifecycle.max_session_duration`.
|
||||
- Shared supervisor status snapshots with bounded failover history.
|
||||
|
||||
Still valuable:
|
||||
|
||||
- Health scoring per profile.
|
||||
- Control-stream coordination before switching.
|
||||
- Stream draining and migration instead of dropping active smux streams.
|
||||
- Shared status output for the active profile and failover history.
|
||||
- User-facing status endpoint/export for the active profile and failover history.
|
||||
|
||||
Likely files:
|
||||
|
||||
|
||||
@@ -11,6 +11,14 @@ import (
|
||||
)
|
||||
|
||||
const DefaultRetryDelay = 2 * time.Second
|
||||
const DefaultHistoryLimit = 20
|
||||
|
||||
const (
|
||||
// EventProfileStart marks a profile attempt starting.
|
||||
EventProfileStart = "profile_start"
|
||||
// EventProfileEnd marks a profile attempt ending.
|
||||
EventProfileEnd = "profile_end"
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrNoProfiles is returned when the supervisor is started without profiles.
|
||||
@@ -25,6 +33,36 @@ type Profile struct {
|
||||
Config session.Config
|
||||
}
|
||||
|
||||
// ProfileStatus summarizes one profile's failover history.
|
||||
type ProfileStatus struct {
|
||||
Name string
|
||||
Starts int
|
||||
Failures int
|
||||
CleanEnds int
|
||||
LastStarted time.Time
|
||||
LastEnded time.Time
|
||||
LastError string
|
||||
}
|
||||
|
||||
// Event is one bounded failover history entry.
|
||||
type Event struct {
|
||||
Time time.Time
|
||||
Type string
|
||||
Profile string
|
||||
Cycle int
|
||||
Error string
|
||||
}
|
||||
|
||||
// Status is a point-in-time view of the supervisor.
|
||||
type Status struct {
|
||||
Cycle int
|
||||
ActiveProfile string
|
||||
ActiveProfileIndex int
|
||||
Profiles []ProfileStatus
|
||||
History []Event
|
||||
LastError string
|
||||
}
|
||||
|
||||
// Runner starts one session profile and blocks until it ends or fails.
|
||||
type Runner func(ctx context.Context, cfg session.Config) error
|
||||
|
||||
@@ -36,6 +74,8 @@ type Config struct {
|
||||
|
||||
OnProfileStart func(profile Profile, cycle int)
|
||||
OnProfileEnd func(profile Profile, cycle int, err error)
|
||||
OnStatus func(status Status)
|
||||
HistoryLimit int
|
||||
}
|
||||
|
||||
// Run starts profiles in order. If a profile exits while ctx is still active,
|
||||
@@ -47,6 +87,7 @@ func Run(ctx context.Context, cfg Config, run Runner) error {
|
||||
if cfg.RetryDelay == 0 {
|
||||
cfg.RetryDelay = DefaultRetryDelay
|
||||
}
|
||||
state := newStatusTracker(cfg.Profiles, cfg.HistoryLimit, cfg.OnStatus)
|
||||
|
||||
var lastErr error
|
||||
for cycle := 1; ; cycle++ {
|
||||
@@ -54,6 +95,7 @@ func Run(ctx context.Context, cfg Config, run Runner) error {
|
||||
if ctx.Err() != nil {
|
||||
return nil
|
||||
}
|
||||
state.start(i, cycle)
|
||||
if cfg.OnProfileStart != nil {
|
||||
cfg.OnProfileStart(profile, cycle)
|
||||
}
|
||||
@@ -67,6 +109,7 @@ func Run(ctx context.Context, cfg Config, run Runner) error {
|
||||
} else {
|
||||
lastErr = fmt.Errorf("profile %q ended", profile.Name)
|
||||
}
|
||||
state.end(i, cycle, err)
|
||||
if cfg.OnProfileEnd != nil {
|
||||
cfg.OnProfileEnd(profile, cycle, err)
|
||||
}
|
||||
@@ -81,6 +124,96 @@ func Run(ctx context.Context, cfg Config, run Runner) error {
|
||||
}
|
||||
}
|
||||
|
||||
type statusTracker struct {
|
||||
status Status
|
||||
notify func(Status)
|
||||
historyLimit int
|
||||
}
|
||||
|
||||
func newStatusTracker(profiles []Profile, historyLimit int, notify func(Status)) *statusTracker {
|
||||
if historyLimit == 0 {
|
||||
historyLimit = DefaultHistoryLimit
|
||||
}
|
||||
statusProfiles := make([]ProfileStatus, 0, len(profiles))
|
||||
for _, profile := range profiles {
|
||||
statusProfiles = append(statusProfiles, ProfileStatus{Name: profile.Name})
|
||||
}
|
||||
return &statusTracker{
|
||||
status: Status{
|
||||
ActiveProfileIndex: -1,
|
||||
Profiles: statusProfiles,
|
||||
},
|
||||
notify: notify,
|
||||
historyLimit: historyLimit,
|
||||
}
|
||||
}
|
||||
|
||||
func (t *statusTracker) start(profileIndex, cycle int) {
|
||||
now := time.Now()
|
||||
profile := &t.status.Profiles[profileIndex]
|
||||
profile.Starts++
|
||||
profile.LastStarted = now
|
||||
t.status.Cycle = cycle
|
||||
t.status.ActiveProfile = profile.Name
|
||||
t.status.ActiveProfileIndex = profileIndex
|
||||
t.appendHistory(Event{
|
||||
Time: now,
|
||||
Type: EventProfileStart,
|
||||
Profile: profile.Name,
|
||||
Cycle: cycle,
|
||||
})
|
||||
t.emit()
|
||||
}
|
||||
|
||||
func (t *statusTracker) end(profileIndex, cycle int, err error) {
|
||||
now := time.Now()
|
||||
profile := &t.status.Profiles[profileIndex]
|
||||
profile.LastEnded = now
|
||||
event := Event{
|
||||
Time: now,
|
||||
Type: EventProfileEnd,
|
||||
Profile: profile.Name,
|
||||
Cycle: cycle,
|
||||
}
|
||||
if err != nil {
|
||||
profile.Failures++
|
||||
profile.LastError = err.Error()
|
||||
t.status.LastError = fmt.Sprintf("profile %q: %v", profile.Name, err)
|
||||
event.Error = err.Error()
|
||||
} else {
|
||||
profile.CleanEnds++
|
||||
profile.LastError = ""
|
||||
t.status.LastError = fmt.Sprintf("profile %q ended", profile.Name)
|
||||
}
|
||||
t.status.ActiveProfile = ""
|
||||
t.status.ActiveProfileIndex = -1
|
||||
t.appendHistory(event)
|
||||
t.emit()
|
||||
}
|
||||
|
||||
func (t *statusTracker) appendHistory(event Event) {
|
||||
if t.historyLimit < 0 {
|
||||
return
|
||||
}
|
||||
t.status.History = append(t.status.History, event)
|
||||
if len(t.status.History) > t.historyLimit {
|
||||
t.status.History = t.status.History[len(t.status.History)-t.historyLimit:]
|
||||
}
|
||||
}
|
||||
|
||||
func (t *statusTracker) emit() {
|
||||
if t.notify == nil {
|
||||
return
|
||||
}
|
||||
t.notify(cloneStatus(t.status))
|
||||
}
|
||||
|
||||
func cloneStatus(status Status) Status {
|
||||
status.Profiles = append([]ProfileStatus(nil), status.Profiles...)
|
||||
status.History = append([]Event(nil), status.History...)
|
||||
return status
|
||||
}
|
||||
|
||||
func waitRetryDelay(ctx context.Context, delay time.Duration) error {
|
||||
if delay <= 0 {
|
||||
return nil
|
||||
|
||||
@@ -58,6 +58,91 @@ func TestRunAdvancesProfilesAndStopsAtMaxCycles(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunEmitsStatusHistory(t *testing.T) {
|
||||
profiles := []Profile{
|
||||
{Name: "first", Config: session.Config{Auth: "wbstream"}},
|
||||
{Name: "second", Config: session.Config{Auth: "jitsi"}},
|
||||
}
|
||||
var snapshots []Status
|
||||
err := Run(context.Background(), Config{
|
||||
Profiles: profiles,
|
||||
RetryDelay: -1,
|
||||
MaxCycles: 1,
|
||||
HistoryLimit: 3,
|
||||
OnStatus: func(status Status) {
|
||||
snapshots = append(snapshots, status)
|
||||
},
|
||||
}, func(_ context.Context, cfg session.Config) error {
|
||||
if cfg.Auth == "first" {
|
||||
t.Fatal("runner received profile name instead of config")
|
||||
}
|
||||
return errRunnerBoom
|
||||
})
|
||||
if !errors.Is(err, ErrMaxCyclesExceeded) {
|
||||
t.Fatalf("Run() error = %v, want %v", err, ErrMaxCyclesExceeded)
|
||||
}
|
||||
if len(snapshots) != 4 {
|
||||
t.Fatalf("status snapshots = %d, want 4", len(snapshots))
|
||||
}
|
||||
first := snapshots[0]
|
||||
if first.ActiveProfile != "first" || first.ActiveProfileIndex != 0 || first.Cycle != 1 {
|
||||
t.Fatalf("first status = %+v", first)
|
||||
}
|
||||
if first.Profiles[0].Starts != 1 || first.Profiles[0].LastStarted.IsZero() {
|
||||
t.Fatalf("first profile start status = %+v", first.Profiles[0])
|
||||
}
|
||||
last := snapshots[len(snapshots)-1]
|
||||
if last.ActiveProfile != "" || last.ActiveProfileIndex != -1 {
|
||||
t.Fatalf("last active status = %+v", last)
|
||||
}
|
||||
if last.Profiles[0].Failures != 1 || last.Profiles[1].Failures != 1 {
|
||||
t.Fatalf("profile failures = %+v", last.Profiles)
|
||||
}
|
||||
if last.LastError == "" || last.Profiles[1].LastError == "" {
|
||||
t.Fatalf("last errors missing: %+v", last)
|
||||
}
|
||||
if len(last.History) != 3 {
|
||||
t.Fatalf("history length = %d, want 3", len(last.History))
|
||||
}
|
||||
if last.History[0].Type != EventProfileEnd || last.History[0].Profile != "first" {
|
||||
t.Fatalf("oldest bounded history event = %+v", last.History[0])
|
||||
}
|
||||
if last.History[2].Type != EventProfileEnd || last.History[2].Profile != "second" ||
|
||||
last.History[2].Error == "" {
|
||||
t.Fatalf("last history event = %+v", last.History[2])
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunStatusSnapshotIsImmutable(t *testing.T) {
|
||||
var first Status
|
||||
var second Status
|
||||
err := Run(context.Background(), Config{
|
||||
Profiles: []Profile{{Name: "one"}},
|
||||
RetryDelay: -1,
|
||||
MaxCycles: 1,
|
||||
OnStatus: func(status Status) {
|
||||
if first.Profiles == nil {
|
||||
first = status
|
||||
first.Profiles[0].Starts = 99
|
||||
first.History[0].Profile = "mutated"
|
||||
return
|
||||
}
|
||||
second = status
|
||||
},
|
||||
}, func(context.Context, session.Config) error {
|
||||
return errRunnerBoom
|
||||
})
|
||||
if !errors.Is(err, ErrMaxCyclesExceeded) {
|
||||
t.Fatalf("Run() error = %v, want %v", err, ErrMaxCyclesExceeded)
|
||||
}
|
||||
if first.Profiles[0].Starts != 99 || first.History[0].Profile != "mutated" {
|
||||
t.Fatalf("test mutation did not apply to snapshot: %+v", first)
|
||||
}
|
||||
if second.Profiles[0].Starts != 1 || second.History[0].Profile != "one" {
|
||||
t.Fatalf("snapshot mutation leaked into later status: %+v", second)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunReturnsNilOnContextCancel(t *testing.T) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
err := Run(ctx, Config{
|
||||
|
||||
Reference in New Issue
Block a user