feat: add planned session rotation

This commit is contained in:
cyber-debug
2026-05-16 00:49:52 +03:00
parent 4c6bd2b838
commit 82b5741ab1
10 changed files with 288 additions and 86 deletions

View File

@@ -26,6 +26,10 @@ liveness:
timeout: 5s
failures: 3
# Optional planned rebuild for long-running calls.
# lifecycle:
# max_session_duration: 6h
# Local SOCKS5 listener exposed to applications
socks:
host: "127.0.0.1"

View File

@@ -34,6 +34,7 @@ olcrtc /etc/olcrtc/server.yaml
| `liveness.interval` | control-stream ping interval, default `10s` |
| `liveness.timeout` | pong timeout, default `5s` |
| `liveness.failures` | missed pongs before reconnect, default `3` |
| `lifecycle.max_session_duration` | planned session rebuild interval, e.g. `6h`; unset = off |
| `gen.amount` | gen mode: number of rooms to create |
| `profiles[]` | ordered srv/cnc failover profiles |
| `failover.retry_delay` | delay before trying the next profile, e.g. `2s` |
@@ -67,6 +68,24 @@ When the failure threshold is reached, the current smux session is rebuilt.
In failover mode, a profile that exits after liveness-triggered reconnect
failure lets the supervisor advance to the next profile.
## Lifecycle Rotation
`lifecycle.max_session_duration` sets a planned upper bound for one provider
call/session. When the duration expires, olcrtc cancels the active server or
client session and starts a fresh one with the same config. While this option
is enabled, clean session endings are also restarted so the peer that did not
fire the timer can follow the rebuild. This is useful for long-running
deployments where provider calls get stale, accumulate media state, or should
be periodically re-created.
```yaml
lifecycle:
max_session_duration: 6h
```
The field is optional and disabled when omitted. Values use Go duration syntax
such as `30m`, `2h`, or `6h`; zero and negative durations are rejected.
## Failover Profiles
`mode: srv` and `mode: cnc` can define `profiles`. Top-level fields are used

View File

@@ -15,6 +15,10 @@ liveness:
timeout: 5s
failures: 3
# Optional planned rebuild for each active profile.
# lifecycle:
# max_session_duration: 6h
data: data
profiles:

View File

@@ -304,6 +304,7 @@ Implemented:
- `failover.retry_delay`.
- `failover.max_cycles`.
- Profile start/end logs.
- Planned session rotation with `lifecycle.max_session_duration`.
Still valuable:

View File

@@ -28,6 +28,10 @@ liveness:
timeout: 5s
failures: 3
# Optional planned rebuild for long-running calls.
# lifecycle:
# max_session_duration: 6h
# Outbound SOCKS5 proxy for server-side egress (optional)
socks:
proxy_addr: "" # e.g. "127.0.0.1"

View File

@@ -66,6 +66,7 @@
| `liveness.interval` | Интервал ping по control stream, по умолчанию `10s` |
| `liveness.timeout` | Сколько ждать pong, по умолчанию `5s` |
| `liveness.failures` | Сколько pong можно пропустить перед rebuild, по умолчанию `3` |
| `lifecycle.max_session_duration` | Плановый rebuild сессии после указанного времени, например `6h`; если поле не задано, выключено |
`crypto.key_file` читается относительно YAML-файла. Не указывай `crypto.key` и `crypto.key_file` одновременно.
@@ -78,6 +79,13 @@
а не только статус WebRTC/provider соединения. Если pong не приходит несколько
раз подряд, текущая smux-сессия пересоздается.
`lifecycle.max_session_duration` ограничивает длительность одного звонка /
provider session. Когда таймер истекает, текущая `srv` или `cnc` сессия
закрывается и стартует заново с тем же конфигом. Пока эта настройка включена,
чистое завершение сессии тоже перезапускается, чтобы второй peer мог догнать
плановый rebuild. Формат значения: `30m`, `2h`, `6h`; `0s` и отрицательные
значения не принимаются.
---
## mode: gen

View File

@@ -7,6 +7,7 @@ import (
"fmt"
"net"
"slices"
"sync/atomic"
"time"
"github.com/openlibrecommunity/olcrtc/internal/auth"
@@ -54,6 +55,8 @@ const (
defaultSEIAckTimeoutMS = 2000
)
var sessionRestartDelay = 2 * time.Second
var (
// ErrRoomIDRequired indicates that a room id is required for the selected carrier.
ErrRoomIDRequired = errors.New("room ID required (set room.id)")
@@ -131,46 +134,50 @@ var (
// ErrLivenessFailuresInvalid indicates that liveness.failures is not positive.
ErrLivenessFailuresInvalid = errors.New(
"invalid liveness failures (set liveness.failures to a value > 0)")
// ErrLifecycleMaxSessionDurationInvalid indicates that lifecycle.max_session_duration is not a positive duration.
ErrLifecycleMaxSessionDurationInvalid = errors.New(
"invalid max session duration (set lifecycle.max_session_duration to a duration > 0)")
)
// Config holds runtime session settings.
type Config struct {
Mode string
Link string
Transport string
Auth string
Engine string
URL string
Token string
RoomID string
KeyHex string
SOCKSHost string
SOCKSPort int
SOCKSUser string
SOCKSPass string
DNSServer string
SOCKSProxyAddr string
SOCKSProxyPort int
VideoWidth int
VideoHeight int
VideoFPS int
VideoBitrate string
VideoHW string
VideoQRSize int
VideoQRRecovery string
VideoCodec string
VideoTileModule int
VideoTileRS int
VP8FPS int
VP8BatchSize int
SEIFPS int
SEIBatchSize int
SEIFragmentSize int
SEIAckTimeoutMS int
LivenessInterval string
LivenessTimeout string
LivenessFailures int
Amount int
Mode string
Link string
Transport string
Auth string
Engine string
URL string
Token string
RoomID string
KeyHex string
SOCKSHost string
SOCKSPort int
SOCKSUser string
SOCKSPass string
DNSServer string
SOCKSProxyAddr string
SOCKSProxyPort int
VideoWidth int
VideoHeight int
VideoFPS int
VideoBitrate string
VideoHW string
VideoQRSize int
VideoQRRecovery string
VideoCodec string
VideoTileModule int
VideoTileRS int
VP8FPS int
VP8BatchSize int
SEIFPS int
SEIBatchSize int
SEIFragmentSize int
SEIAckTimeoutMS int
LivenessInterval string
LivenessTimeout string
LivenessFailures int
MaxSessionDuration string
Amount int
}
// RegisterDefaults registers built-in carriers and transports.
@@ -323,6 +330,9 @@ func Validate(cfg Config) error {
if err := validateLivenessConfig(cfg); err != nil {
return err
}
if err := validateLifecycleConfig(cfg); err != nil {
return err
}
return validateModeConfig(cfg)
}
@@ -475,6 +485,13 @@ func validateLivenessConfig(cfg Config) error {
return nil
}
func validateLifecycleConfig(cfg Config) error {
if _, err := maxSessionDuration(cfg); err != nil {
return err
}
return nil
}
func parseLivenessDuration(value string, def time.Duration) (time.Duration, error) {
if value == "" {
return def, nil
@@ -508,6 +525,20 @@ func livenessConfig(cfg Config) (control.Config, error) {
return control.Config{Interval: interval, Timeout: timeout, Failures: failures}, nil
}
func maxSessionDuration(cfg Config) (time.Duration, error) {
if cfg.MaxSessionDuration == "" {
return 0, nil
}
d, err := time.ParseDuration(cfg.MaxSessionDuration)
if err != nil {
return 0, fmt.Errorf("%w: %v", ErrLifecycleMaxSessionDurationInvalid, err)
}
if d <= 0 {
return 0, ErrLifecycleMaxSessionDurationInvalid
}
return d, nil
}
func isLoopbackListenHost(host string) bool {
if host == "localhost" {
return true
@@ -525,7 +556,21 @@ func Run(ctx context.Context, cfg Config) error {
if err != nil {
return err
}
maxDuration, err := maxSessionDuration(cfg)
if err != nil {
return err
}
run := func(ctx context.Context) error {
return runOnce(ctx, cfg, roomURL, liveness)
}
if maxDuration > 0 {
return runWithSessionRotation(ctx, maxDuration, run)
}
return run(ctx)
}
func runOnce(ctx context.Context, cfg Config, roomURL string, liveness control.Config) error {
switch cfg.Mode {
case modeSRV:
if err := server.Run(ctx, server.Config{
@@ -610,6 +655,52 @@ func Run(ctx context.Context, cfg Config) error {
}
}
func runWithSessionRotation(ctx context.Context, maxDuration time.Duration, run func(context.Context) error) error {
for cycle := 1; ; cycle++ {
currentCycle := cycle
runCtx, cancel := context.WithCancel(ctx)
var rotated atomic.Bool
timer := time.AfterFunc(maxDuration, func() {
rotated.Store(true)
logger.Infof("session max duration reached: duration=%s cycle=%d", maxDuration, currentCycle)
cancel()
})
err := run(runCtx)
cancel()
timer.Stop()
if ctx.Err() != nil {
return nil
}
if rotated.Load() {
if err != nil {
logger.Warnf("session rotation ended with error: cycle=%d err=%v", currentCycle, err)
}
logger.Infof("session rotation restarting: next_cycle=%d", currentCycle+1)
if err := waitSessionRestart(ctx); err != nil {
return nil
}
continue
}
if err != nil {
return err
}
logger.Infof("session ended cleanly with lifecycle rotation enabled: next_cycle=%d", currentCycle+1)
if err := waitSessionRestart(ctx); err != nil {
return nil
}
}
}
func waitSessionRestart(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(sessionRestartDelay):
return nil
}
}
// ValidateGen validates that the config contains enough fields to run gen mode.
func ValidateGen(cfg Config) error {
if cfg.Auth == "" {

View File

@@ -3,7 +3,9 @@ package session
import (
"context"
"errors"
"sync/atomic"
"testing"
"time"
"github.com/openlibrecommunity/olcrtc/internal/control"
)
@@ -105,6 +107,31 @@ func TestApplyLivenessDefaults(t *testing.T) {
}
}
func TestRunWithSessionRotationRestartsAfterMaxDuration(t *testing.T) {
oldRestartDelay := sessionRestartDelay
sessionRestartDelay = time.Millisecond
t.Cleanup(func() { sessionRestartDelay = oldRestartDelay })
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var calls atomic.Int32
err := runWithSessionRotation(ctx, 5*time.Millisecond, func(ctx context.Context) error {
if calls.Add(1) >= 2 {
cancel()
return nil
}
<-ctx.Done()
return nil
})
if err != nil {
t.Fatalf("runWithSessionRotation() error = %v", err)
}
if got := calls.Load(); got < 2 {
t.Fatalf("run calls = %d, want at least 2", got)
}
}
//nolint:maintidx // table-driven validation test naturally has many cases
func TestValidate(t *testing.T) {
RegisterDefaults()
@@ -469,6 +496,32 @@ func TestValidate(t *testing.T) {
}(),
want: ErrLivenessFailuresInvalid,
},
{
name: "lifecycle accepts max session duration",
cfg: func() Config {
cfg := base
cfg.MaxSessionDuration = "1h"
return cfg
}(),
},
{
name: "lifecycle rejects bad max session duration",
cfg: func() Config {
cfg := base
cfg.MaxSessionDuration = "nope"
return cfg
}(),
want: ErrLifecycleMaxSessionDurationInvalid,
},
{
name: "lifecycle rejects zero max session duration",
cfg: func() Config {
cfg := base
cfg.MaxSessionDuration = "0s"
return cfg
}(),
want: ErrLifecycleMaxSessionDurationInvalid,
},
}
for _, tt := range tests {

View File

@@ -30,40 +30,42 @@ var (
// File is the on-disk YAML schema.
type File struct {
Mode string `yaml:"mode"`
Link string `yaml:"link"`
Auth Auth `yaml:"auth"`
Room Room `yaml:"room"`
Crypto Crypto `yaml:"crypto"`
Net Net `yaml:"net"`
SOCKS SOCKS `yaml:"socks"`
Engine Engine `yaml:"engine"`
Video Video `yaml:"video"`
VP8 VP8 `yaml:"vp8"`
SEI SEI `yaml:"sei"`
Liveness Liveness `yaml:"liveness"`
Gen Gen `yaml:"gen"`
Profiles []Profile `yaml:"profiles"`
Failover Failover `yaml:"failover"`
Data string `yaml:"data"`
Debug bool `yaml:"debug"`
FFmpeg string `yaml:"ffmpeg"`
Mode string `yaml:"mode"`
Link string `yaml:"link"`
Auth Auth `yaml:"auth"`
Room Room `yaml:"room"`
Crypto Crypto `yaml:"crypto"`
Net Net `yaml:"net"`
SOCKS SOCKS `yaml:"socks"`
Engine Engine `yaml:"engine"`
Video Video `yaml:"video"`
VP8 VP8 `yaml:"vp8"`
SEI SEI `yaml:"sei"`
Liveness Liveness `yaml:"liveness"`
Lifecycle Lifecycle `yaml:"lifecycle"`
Gen Gen `yaml:"gen"`
Profiles []Profile `yaml:"profiles"`
Failover Failover `yaml:"failover"`
Data string `yaml:"data"`
Debug bool `yaml:"debug"`
FFmpeg string `yaml:"ffmpeg"`
}
// Profile is a failover entry that overrides top-level runtime fields.
type Profile struct {
Name string `yaml:"name"`
Link string `yaml:"link"`
Auth Auth `yaml:"auth"`
Room Room `yaml:"room"`
Crypto Crypto `yaml:"crypto"`
Net Net `yaml:"net"`
SOCKS SOCKS `yaml:"socks"`
Engine Engine `yaml:"engine"`
Video Video `yaml:"video"`
VP8 VP8 `yaml:"vp8"`
SEI SEI `yaml:"sei"`
Liveness Liveness `yaml:"liveness"`
Name string `yaml:"name"`
Link string `yaml:"link"`
Auth Auth `yaml:"auth"`
Room Room `yaml:"room"`
Crypto Crypto `yaml:"crypto"`
Net Net `yaml:"net"`
SOCKS SOCKS `yaml:"socks"`
Engine Engine `yaml:"engine"`
Video Video `yaml:"video"`
VP8 VP8 `yaml:"vp8"`
SEI SEI `yaml:"sei"`
Liveness Liveness `yaml:"liveness"`
Lifecycle Lifecycle `yaml:"lifecycle"`
}
// Failover controls ordered profile failover.
@@ -146,6 +148,11 @@ type Liveness struct {
Failures int `yaml:"failures"`
}
// Lifecycle controls planned session rebuilds.
type Lifecycle struct {
MaxSessionDuration string `yaml:"max_session_duration"`
}
// Gen controls room-generation mode.
type Gen struct {
Amount int `yaml:"amount"`
@@ -260,6 +267,7 @@ func Apply(dst session.Config, f File) session.Config {
dst.LivenessInterval = pickString(dst.LivenessInterval, f.Liveness.Interval)
dst.LivenessTimeout = pickString(dst.LivenessTimeout, f.Liveness.Timeout)
dst.LivenessFailures = pickInt(dst.LivenessFailures, f.Liveness.Failures)
dst.MaxSessionDuration = pickString(dst.MaxSessionDuration, f.Lifecycle.MaxSessionDuration)
dst.Amount = pickInt(dst.Amount, f.Gen.Amount)
return dst
}
@@ -301,6 +309,7 @@ func ApplyProfile(base session.Config, p Profile) session.Config {
dst.LivenessInterval = overlayString(dst.LivenessInterval, p.Liveness.Interval)
dst.LivenessTimeout = overlayString(dst.LivenessTimeout, p.Liveness.Timeout)
dst.LivenessFailures = overlayInt(dst.LivenessFailures, p.Liveness.Failures)
dst.MaxSessionDuration = overlayString(dst.MaxSessionDuration, p.Lifecycle.MaxSessionDuration)
return dst
}

View File

@@ -43,6 +43,8 @@ liveness:
interval: 2s
timeout: 500ms
failures: 4
lifecycle:
max_session_duration: 6h
gen:
amount: 3
debug: true
@@ -80,23 +82,24 @@ func requireLoadedFile(t *testing.T, f File) {
func requireAppliedConfig(t *testing.T, got session.Config) {
t.Helper()
want := session.Config{
Mode: testModeSrv,
Link: "direct",
Auth: testAuthProvider,
RoomID: testRoomID,
KeyHex: testCryptoKey,
Transport: "datachannel",
DNSServer: "1.1.1.1:53",
SOCKSHost: "127.0.0.1",
SOCKSPort: 1080,
SOCKSUser: "u",
SOCKSPass: "p",
VP8FPS: 25,
VP8BatchSize: 4,
LivenessInterval: "2s",
LivenessTimeout: "500ms",
LivenessFailures: 4,
Amount: 3,
Mode: testModeSrv,
Link: "direct",
Auth: testAuthProvider,
RoomID: testRoomID,
KeyHex: testCryptoKey,
Transport: "datachannel",
DNSServer: "1.1.1.1:53",
SOCKSHost: "127.0.0.1",
SOCKSPort: 1080,
SOCKSUser: "u",
SOCKSPass: "p",
VP8FPS: 25,
VP8BatchSize: 4,
LivenessInterval: "2s",
LivenessTimeout: "500ms",
LivenessFailures: 4,
MaxSessionDuration: "6h",
Amount: 3,
}
if got != want {
t.Fatalf("Apply produced wrong config: %+v, want %+v", got, want)
@@ -143,6 +146,8 @@ liveness:
interval: 5s
timeout: 2s
failures: 5
lifecycle:
max_session_duration: 6h
profiles:
- name: wb-vp8
auth:
@@ -155,6 +160,8 @@ profiles:
fps: 30
liveness:
interval: 1s
lifecycle:
max_session_duration: 30m
- name: jitsi-dc
auth:
provider: jitsi
@@ -188,7 +195,8 @@ failover:
t.Fatalf("first profile = %+v", first)
}
if first.KeyHex != "shared-key" || first.DNSServer != "1.1.1.1:53" || first.VP8FPS != 30 ||
first.LivenessInterval != "1s" || first.LivenessTimeout != "2s" || first.LivenessFailures != 5 {
first.LivenessInterval != "1s" || first.LivenessTimeout != "2s" || first.LivenessFailures != 5 ||
first.MaxSessionDuration != "30m" {
t.Fatalf("first inherited/overlaid fields = %+v", first)
}
second := ApplyProfile(base, f.Profiles[1])
@@ -196,8 +204,9 @@ failover:
second.RoomID != "https://meet.example/room" || second.DNSServer != "8.8.8.8:53" {
t.Fatalf("second profile = %+v", second)
}
if second.LivenessInterval != "5s" || second.LivenessTimeout != "2s" || second.LivenessFailures != 5 {
t.Fatalf("second liveness fields = %+v", second)
if second.LivenessInterval != "5s" || second.LivenessTimeout != "2s" || second.LivenessFailures != 5 ||
second.MaxSessionDuration != "6h" {
t.Fatalf("second lifecycle/liveness fields = %+v", second)
}
}