From 82b5741ab1ea6896c035a22331b9f0ba049f8f79 Mon Sep 17 00:00:00 2001 From: cyber-debug Date: Sat, 16 May 2026 00:49:52 +0300 Subject: [PATCH] feat: add planned session rotation --- docs/client.example.yaml | 4 + docs/configuration.md | 19 ++++ docs/failover.example.yaml | 4 + docs/project-map.md | 1 + docs/server.example.yaml | 4 + docs/settings.md | 8 ++ internal/app/session/session.go | 163 +++++++++++++++++++++------ internal/app/session/session_test.go | 53 +++++++++ internal/config/config.go | 69 +++++++----- internal/config/config_test.go | 49 ++++---- 10 files changed, 288 insertions(+), 86 deletions(-) diff --git a/docs/client.example.yaml b/docs/client.example.yaml index a074a6a..06b9b5e 100644 --- a/docs/client.example.yaml +++ b/docs/client.example.yaml @@ -26,6 +26,10 @@ liveness: timeout: 5s failures: 3 +# Optional planned rebuild for long-running calls. +# lifecycle: +# max_session_duration: 6h + # Local SOCKS5 listener exposed to applications socks: host: "127.0.0.1" diff --git a/docs/configuration.md b/docs/configuration.md index 8c067ad..41bdeaa 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -34,6 +34,7 @@ olcrtc /etc/olcrtc/server.yaml | `liveness.interval` | control-stream ping interval, default `10s` | | `liveness.timeout` | pong timeout, default `5s` | | `liveness.failures` | missed pongs before reconnect, default `3` | +| `lifecycle.max_session_duration` | planned session rebuild interval, e.g. `6h`; unset = off | | `gen.amount` | gen mode: number of rooms to create | | `profiles[]` | ordered srv/cnc failover profiles | | `failover.retry_delay` | delay before trying the next profile, e.g. `2s` | @@ -67,6 +68,24 @@ When the failure threshold is reached, the current smux session is rebuilt. In failover mode, a profile that exits after liveness-triggered reconnect failure lets the supervisor advance to the next profile. +## Lifecycle Rotation + +`lifecycle.max_session_duration` sets a planned upper bound for one provider +call/session. When the duration expires, olcrtc cancels the active server or +client session and starts a fresh one with the same config. While this option +is enabled, clean session endings are also restarted so the peer that did not +fire the timer can follow the rebuild. This is useful for long-running +deployments where provider calls get stale, accumulate media state, or should +be periodically re-created. + +```yaml +lifecycle: + max_session_duration: 6h +``` + +The field is optional and disabled when omitted. Values use Go duration syntax +such as `30m`, `2h`, or `6h`; zero and negative durations are rejected. + ## Failover Profiles `mode: srv` and `mode: cnc` can define `profiles`. Top-level fields are used diff --git a/docs/failover.example.yaml b/docs/failover.example.yaml index e956a35..298a847 100644 --- a/docs/failover.example.yaml +++ b/docs/failover.example.yaml @@ -15,6 +15,10 @@ liveness: timeout: 5s failures: 3 +# Optional planned rebuild for each active profile. +# lifecycle: +# max_session_duration: 6h + data: data profiles: diff --git a/docs/project-map.md b/docs/project-map.md index 4481982..55fd291 100644 --- a/docs/project-map.md +++ b/docs/project-map.md @@ -304,6 +304,7 @@ Implemented: - `failover.retry_delay`. - `failover.max_cycles`. - Profile start/end logs. +- Planned session rotation with `lifecycle.max_session_duration`. Still valuable: diff --git a/docs/server.example.yaml b/docs/server.example.yaml index c20b1e5..300f7cf 100644 --- a/docs/server.example.yaml +++ b/docs/server.example.yaml @@ -28,6 +28,10 @@ liveness: timeout: 5s failures: 3 +# Optional planned rebuild for long-running calls. +# lifecycle: +# max_session_duration: 6h + # Outbound SOCKS5 proxy for server-side egress (optional) socks: proxy_addr: "" # e.g. "127.0.0.1" diff --git a/docs/settings.md b/docs/settings.md index 2e2d78a..9f9d215 100644 --- a/docs/settings.md +++ b/docs/settings.md @@ -66,6 +66,7 @@ | `liveness.interval` | Интервал ping по control stream, по умолчанию `10s` | | `liveness.timeout` | Сколько ждать pong, по умолчанию `5s` | | `liveness.failures` | Сколько pong можно пропустить перед rebuild, по умолчанию `3` | +| `lifecycle.max_session_duration` | Плановый rebuild сессии после указанного времени, например `6h`; если поле не задано, выключено | `crypto.key_file` читается относительно YAML-файла. Не указывай `crypto.key` и `crypto.key_file` одновременно. @@ -78,6 +79,13 @@ а не только статус WebRTC/provider соединения. Если pong не приходит несколько раз подряд, текущая smux-сессия пересоздается. +`lifecycle.max_session_duration` ограничивает длительность одного звонка / +provider session. Когда таймер истекает, текущая `srv` или `cnc` сессия +закрывается и стартует заново с тем же конфигом. Пока эта настройка включена, +чистое завершение сессии тоже перезапускается, чтобы второй peer мог догнать +плановый rebuild. Формат значения: `30m`, `2h`, `6h`; `0s` и отрицательные +значения не принимаются. + --- ## mode: gen diff --git a/internal/app/session/session.go b/internal/app/session/session.go index 360d96a..0b48f50 100644 --- a/internal/app/session/session.go +++ b/internal/app/session/session.go @@ -7,6 +7,7 @@ import ( "fmt" "net" "slices" + "sync/atomic" "time" "github.com/openlibrecommunity/olcrtc/internal/auth" @@ -54,6 +55,8 @@ const ( defaultSEIAckTimeoutMS = 2000 ) +var sessionRestartDelay = 2 * time.Second + var ( // ErrRoomIDRequired indicates that a room id is required for the selected carrier. ErrRoomIDRequired = errors.New("room ID required (set room.id)") @@ -131,46 +134,50 @@ var ( // ErrLivenessFailuresInvalid indicates that liveness.failures is not positive. ErrLivenessFailuresInvalid = errors.New( "invalid liveness failures (set liveness.failures to a value > 0)") + // ErrLifecycleMaxSessionDurationInvalid indicates that lifecycle.max_session_duration is not a positive duration. + ErrLifecycleMaxSessionDurationInvalid = errors.New( + "invalid max session duration (set lifecycle.max_session_duration to a duration > 0)") ) // Config holds runtime session settings. type Config struct { - Mode string - Link string - Transport string - Auth string - Engine string - URL string - Token string - RoomID string - KeyHex string - SOCKSHost string - SOCKSPort int - SOCKSUser string - SOCKSPass string - DNSServer string - SOCKSProxyAddr string - SOCKSProxyPort int - VideoWidth int - VideoHeight int - VideoFPS int - VideoBitrate string - VideoHW string - VideoQRSize int - VideoQRRecovery string - VideoCodec string - VideoTileModule int - VideoTileRS int - VP8FPS int - VP8BatchSize int - SEIFPS int - SEIBatchSize int - SEIFragmentSize int - SEIAckTimeoutMS int - LivenessInterval string - LivenessTimeout string - LivenessFailures int - Amount int + Mode string + Link string + Transport string + Auth string + Engine string + URL string + Token string + RoomID string + KeyHex string + SOCKSHost string + SOCKSPort int + SOCKSUser string + SOCKSPass string + DNSServer string + SOCKSProxyAddr string + SOCKSProxyPort int + VideoWidth int + VideoHeight int + VideoFPS int + VideoBitrate string + VideoHW string + VideoQRSize int + VideoQRRecovery string + VideoCodec string + VideoTileModule int + VideoTileRS int + VP8FPS int + VP8BatchSize int + SEIFPS int + SEIBatchSize int + SEIFragmentSize int + SEIAckTimeoutMS int + LivenessInterval string + LivenessTimeout string + LivenessFailures int + MaxSessionDuration string + Amount int } // RegisterDefaults registers built-in carriers and transports. @@ -323,6 +330,9 @@ func Validate(cfg Config) error { if err := validateLivenessConfig(cfg); err != nil { return err } + if err := validateLifecycleConfig(cfg); err != nil { + return err + } return validateModeConfig(cfg) } @@ -475,6 +485,13 @@ func validateLivenessConfig(cfg Config) error { return nil } +func validateLifecycleConfig(cfg Config) error { + if _, err := maxSessionDuration(cfg); err != nil { + return err + } + return nil +} + func parseLivenessDuration(value string, def time.Duration) (time.Duration, error) { if value == "" { return def, nil @@ -508,6 +525,20 @@ func livenessConfig(cfg Config) (control.Config, error) { return control.Config{Interval: interval, Timeout: timeout, Failures: failures}, nil } +func maxSessionDuration(cfg Config) (time.Duration, error) { + if cfg.MaxSessionDuration == "" { + return 0, nil + } + d, err := time.ParseDuration(cfg.MaxSessionDuration) + if err != nil { + return 0, fmt.Errorf("%w: %v", ErrLifecycleMaxSessionDurationInvalid, err) + } + if d <= 0 { + return 0, ErrLifecycleMaxSessionDurationInvalid + } + return d, nil +} + func isLoopbackListenHost(host string) bool { if host == "localhost" { return true @@ -525,7 +556,21 @@ func Run(ctx context.Context, cfg Config) error { if err != nil { return err } + maxDuration, err := maxSessionDuration(cfg) + if err != nil { + return err + } + run := func(ctx context.Context) error { + return runOnce(ctx, cfg, roomURL, liveness) + } + if maxDuration > 0 { + return runWithSessionRotation(ctx, maxDuration, run) + } + return run(ctx) +} + +func runOnce(ctx context.Context, cfg Config, roomURL string, liveness control.Config) error { switch cfg.Mode { case modeSRV: if err := server.Run(ctx, server.Config{ @@ -610,6 +655,52 @@ func Run(ctx context.Context, cfg Config) error { } } +func runWithSessionRotation(ctx context.Context, maxDuration time.Duration, run func(context.Context) error) error { + for cycle := 1; ; cycle++ { + currentCycle := cycle + runCtx, cancel := context.WithCancel(ctx) + var rotated atomic.Bool + timer := time.AfterFunc(maxDuration, func() { + rotated.Store(true) + logger.Infof("session max duration reached: duration=%s cycle=%d", maxDuration, currentCycle) + cancel() + }) + + err := run(runCtx) + cancel() + timer.Stop() + if ctx.Err() != nil { + return nil + } + if rotated.Load() { + if err != nil { + logger.Warnf("session rotation ended with error: cycle=%d err=%v", currentCycle, err) + } + logger.Infof("session rotation restarting: next_cycle=%d", currentCycle+1) + if err := waitSessionRestart(ctx); err != nil { + return nil + } + continue + } + if err != nil { + return err + } + logger.Infof("session ended cleanly with lifecycle rotation enabled: next_cycle=%d", currentCycle+1) + if err := waitSessionRestart(ctx); err != nil { + return nil + } + } +} + +func waitSessionRestart(ctx context.Context) error { + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(sessionRestartDelay): + return nil + } +} + // ValidateGen validates that the config contains enough fields to run gen mode. func ValidateGen(cfg Config) error { if cfg.Auth == "" { diff --git a/internal/app/session/session_test.go b/internal/app/session/session_test.go index 95270b2..5fc219d 100644 --- a/internal/app/session/session_test.go +++ b/internal/app/session/session_test.go @@ -3,7 +3,9 @@ package session import ( "context" "errors" + "sync/atomic" "testing" + "time" "github.com/openlibrecommunity/olcrtc/internal/control" ) @@ -105,6 +107,31 @@ func TestApplyLivenessDefaults(t *testing.T) { } } +func TestRunWithSessionRotationRestartsAfterMaxDuration(t *testing.T) { + oldRestartDelay := sessionRestartDelay + sessionRestartDelay = time.Millisecond + t.Cleanup(func() { sessionRestartDelay = oldRestartDelay }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var calls atomic.Int32 + err := runWithSessionRotation(ctx, 5*time.Millisecond, func(ctx context.Context) error { + if calls.Add(1) >= 2 { + cancel() + return nil + } + <-ctx.Done() + return nil + }) + if err != nil { + t.Fatalf("runWithSessionRotation() error = %v", err) + } + if got := calls.Load(); got < 2 { + t.Fatalf("run calls = %d, want at least 2", got) + } +} + //nolint:maintidx // table-driven validation test naturally has many cases func TestValidate(t *testing.T) { RegisterDefaults() @@ -469,6 +496,32 @@ func TestValidate(t *testing.T) { }(), want: ErrLivenessFailuresInvalid, }, + { + name: "lifecycle accepts max session duration", + cfg: func() Config { + cfg := base + cfg.MaxSessionDuration = "1h" + return cfg + }(), + }, + { + name: "lifecycle rejects bad max session duration", + cfg: func() Config { + cfg := base + cfg.MaxSessionDuration = "nope" + return cfg + }(), + want: ErrLifecycleMaxSessionDurationInvalid, + }, + { + name: "lifecycle rejects zero max session duration", + cfg: func() Config { + cfg := base + cfg.MaxSessionDuration = "0s" + return cfg + }(), + want: ErrLifecycleMaxSessionDurationInvalid, + }, } for _, tt := range tests { diff --git a/internal/config/config.go b/internal/config/config.go index 9524363..770adf5 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -30,40 +30,42 @@ var ( // File is the on-disk YAML schema. type File struct { - Mode string `yaml:"mode"` - Link string `yaml:"link"` - Auth Auth `yaml:"auth"` - Room Room `yaml:"room"` - Crypto Crypto `yaml:"crypto"` - Net Net `yaml:"net"` - SOCKS SOCKS `yaml:"socks"` - Engine Engine `yaml:"engine"` - Video Video `yaml:"video"` - VP8 VP8 `yaml:"vp8"` - SEI SEI `yaml:"sei"` - Liveness Liveness `yaml:"liveness"` - Gen Gen `yaml:"gen"` - Profiles []Profile `yaml:"profiles"` - Failover Failover `yaml:"failover"` - Data string `yaml:"data"` - Debug bool `yaml:"debug"` - FFmpeg string `yaml:"ffmpeg"` + Mode string `yaml:"mode"` + Link string `yaml:"link"` + Auth Auth `yaml:"auth"` + Room Room `yaml:"room"` + Crypto Crypto `yaml:"crypto"` + Net Net `yaml:"net"` + SOCKS SOCKS `yaml:"socks"` + Engine Engine `yaml:"engine"` + Video Video `yaml:"video"` + VP8 VP8 `yaml:"vp8"` + SEI SEI `yaml:"sei"` + Liveness Liveness `yaml:"liveness"` + Lifecycle Lifecycle `yaml:"lifecycle"` + Gen Gen `yaml:"gen"` + Profiles []Profile `yaml:"profiles"` + Failover Failover `yaml:"failover"` + Data string `yaml:"data"` + Debug bool `yaml:"debug"` + FFmpeg string `yaml:"ffmpeg"` } // Profile is a failover entry that overrides top-level runtime fields. type Profile struct { - Name string `yaml:"name"` - Link string `yaml:"link"` - Auth Auth `yaml:"auth"` - Room Room `yaml:"room"` - Crypto Crypto `yaml:"crypto"` - Net Net `yaml:"net"` - SOCKS SOCKS `yaml:"socks"` - Engine Engine `yaml:"engine"` - Video Video `yaml:"video"` - VP8 VP8 `yaml:"vp8"` - SEI SEI `yaml:"sei"` - Liveness Liveness `yaml:"liveness"` + Name string `yaml:"name"` + Link string `yaml:"link"` + Auth Auth `yaml:"auth"` + Room Room `yaml:"room"` + Crypto Crypto `yaml:"crypto"` + Net Net `yaml:"net"` + SOCKS SOCKS `yaml:"socks"` + Engine Engine `yaml:"engine"` + Video Video `yaml:"video"` + VP8 VP8 `yaml:"vp8"` + SEI SEI `yaml:"sei"` + Liveness Liveness `yaml:"liveness"` + Lifecycle Lifecycle `yaml:"lifecycle"` } // Failover controls ordered profile failover. @@ -146,6 +148,11 @@ type Liveness struct { Failures int `yaml:"failures"` } +// Lifecycle controls planned session rebuilds. +type Lifecycle struct { + MaxSessionDuration string `yaml:"max_session_duration"` +} + // Gen controls room-generation mode. type Gen struct { Amount int `yaml:"amount"` @@ -260,6 +267,7 @@ func Apply(dst session.Config, f File) session.Config { dst.LivenessInterval = pickString(dst.LivenessInterval, f.Liveness.Interval) dst.LivenessTimeout = pickString(dst.LivenessTimeout, f.Liveness.Timeout) dst.LivenessFailures = pickInt(dst.LivenessFailures, f.Liveness.Failures) + dst.MaxSessionDuration = pickString(dst.MaxSessionDuration, f.Lifecycle.MaxSessionDuration) dst.Amount = pickInt(dst.Amount, f.Gen.Amount) return dst } @@ -301,6 +309,7 @@ func ApplyProfile(base session.Config, p Profile) session.Config { dst.LivenessInterval = overlayString(dst.LivenessInterval, p.Liveness.Interval) dst.LivenessTimeout = overlayString(dst.LivenessTimeout, p.Liveness.Timeout) dst.LivenessFailures = overlayInt(dst.LivenessFailures, p.Liveness.Failures) + dst.MaxSessionDuration = overlayString(dst.MaxSessionDuration, p.Lifecycle.MaxSessionDuration) return dst } diff --git a/internal/config/config_test.go b/internal/config/config_test.go index b41604c..06d1406 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -43,6 +43,8 @@ liveness: interval: 2s timeout: 500ms failures: 4 +lifecycle: + max_session_duration: 6h gen: amount: 3 debug: true @@ -80,23 +82,24 @@ func requireLoadedFile(t *testing.T, f File) { func requireAppliedConfig(t *testing.T, got session.Config) { t.Helper() want := session.Config{ - Mode: testModeSrv, - Link: "direct", - Auth: testAuthProvider, - RoomID: testRoomID, - KeyHex: testCryptoKey, - Transport: "datachannel", - DNSServer: "1.1.1.1:53", - SOCKSHost: "127.0.0.1", - SOCKSPort: 1080, - SOCKSUser: "u", - SOCKSPass: "p", - VP8FPS: 25, - VP8BatchSize: 4, - LivenessInterval: "2s", - LivenessTimeout: "500ms", - LivenessFailures: 4, - Amount: 3, + Mode: testModeSrv, + Link: "direct", + Auth: testAuthProvider, + RoomID: testRoomID, + KeyHex: testCryptoKey, + Transport: "datachannel", + DNSServer: "1.1.1.1:53", + SOCKSHost: "127.0.0.1", + SOCKSPort: 1080, + SOCKSUser: "u", + SOCKSPass: "p", + VP8FPS: 25, + VP8BatchSize: 4, + LivenessInterval: "2s", + LivenessTimeout: "500ms", + LivenessFailures: 4, + MaxSessionDuration: "6h", + Amount: 3, } if got != want { t.Fatalf("Apply produced wrong config: %+v, want %+v", got, want) @@ -143,6 +146,8 @@ liveness: interval: 5s timeout: 2s failures: 5 +lifecycle: + max_session_duration: 6h profiles: - name: wb-vp8 auth: @@ -155,6 +160,8 @@ profiles: fps: 30 liveness: interval: 1s + lifecycle: + max_session_duration: 30m - name: jitsi-dc auth: provider: jitsi @@ -188,7 +195,8 @@ failover: t.Fatalf("first profile = %+v", first) } if first.KeyHex != "shared-key" || first.DNSServer != "1.1.1.1:53" || first.VP8FPS != 30 || - first.LivenessInterval != "1s" || first.LivenessTimeout != "2s" || first.LivenessFailures != 5 { + first.LivenessInterval != "1s" || first.LivenessTimeout != "2s" || first.LivenessFailures != 5 || + first.MaxSessionDuration != "30m" { t.Fatalf("first inherited/overlaid fields = %+v", first) } second := ApplyProfile(base, f.Profiles[1]) @@ -196,8 +204,9 @@ failover: second.RoomID != "https://meet.example/room" || second.DNSServer != "8.8.8.8:53" { t.Fatalf("second profile = %+v", second) } - if second.LivenessInterval != "5s" || second.LivenessTimeout != "2s" || second.LivenessFailures != 5 { - t.Fatalf("second liveness fields = %+v", second) + if second.LivenessInterval != "5s" || second.LivenessTimeout != "2s" || second.LivenessFailures != 5 || + second.MaxSessionDuration != "6h" { + t.Fatalf("second lifecycle/liveness fields = %+v", second) } }