refactor: improve error context and test clarity

This commit is contained in:
zarazaex69
2026-05-16 04:06:55 +03:00
parent 6116130e3b
commit 6222896921
19 changed files with 214 additions and 120 deletions

View File

@@ -161,7 +161,7 @@ func TestRunWithArgsAppliesTransportDefaults(t *testing.T) {
oldRunSession := runSession
t.Cleanup(func() { runSession = oldRunSession })
runSession = func(ctx context.Context, cfg session.Config) error {
runSession = func(_ context.Context, cfg session.Config) error {
if cfg.VP8FPS != 25 || cfg.VP8BatchSize != 1 {
t.Fatalf("VP8 defaults = fps %d batch %d, want 25/1", cfg.VP8FPS, cfg.VP8BatchSize)
}
@@ -200,7 +200,7 @@ func TestRunWithArgsFailoverProfiles(t *testing.T) {
oldRunSession := runSession
t.Cleanup(func() { runSession = oldRunSession })
var seen []string
runSession = func(ctx context.Context, cfg session.Config) error {
runSession = func(_ context.Context, cfg session.Config) error {
seen = append(seen, cfg.Auth+"/"+cfg.Transport)
if cfg.Auth == "wbstream" && (cfg.VP8FPS != 25 || cfg.VP8BatchSize != 1) {
t.Fatalf("VP8 defaults = fps %d batch %d, want 25/1", cfg.VP8FPS, cfg.VP8BatchSize)

View File

@@ -56,7 +56,7 @@ const (
defaultSEIAckTimeoutMS = 2000
)
var sessionRestartDelay = 2 * time.Second
var sessionRestartDelay = 2 * time.Second //nolint:gochecknoglobals // tests shorten lifecycle rotation delay
var (
// ErrRoomIDRequired indicates that a room id is required for the selected carrier.
@@ -147,6 +147,8 @@ var (
// ErrTrafficMaxDelayInvalid indicates that traffic.max_delay is not a non-negative duration.
ErrTrafficMaxDelayInvalid = errors.New(
"invalid traffic max delay (set traffic.max_delay to a duration >= 0 and >= traffic.min_delay)")
errPositiveDuration = errors.New("duration must be > 0")
errNonNegativeDuration = errors.New("duration must be >= 0")
)
// Config holds runtime session settings.
@@ -264,20 +266,15 @@ func applyVideoDefaults(cfg Config) Config {
if cfg.VideoCodec == "" {
cfg.VideoCodec = videoCodecQRCode
}
width := defaultVideoWidth
if cfg.VideoCodec == videoCodecTile {
if cfg.VideoWidth == 0 {
cfg.VideoWidth = 1080
}
if cfg.VideoHeight == 0 {
cfg.VideoHeight = 1080
}
} else {
if cfg.VideoWidth == 0 {
cfg.VideoWidth = defaultVideoWidth
}
if cfg.VideoHeight == 0 {
cfg.VideoHeight = defaultVideoHeight
}
width = defaultVideoHeight
}
if cfg.VideoWidth == 0 {
cfg.VideoWidth = width
}
if cfg.VideoHeight == 0 {
cfg.VideoHeight = defaultVideoHeight
}
if cfg.VideoFPS == 0 {
cfg.VideoFPS = defaultVideoFPS
@@ -490,10 +487,10 @@ func validateModeConfig(cfg Config) error {
func validateLivenessConfig(cfg Config) error {
if _, err := parseLivenessDuration(cfg.LivenessInterval, control.DefaultInterval); err != nil {
return fmt.Errorf("%w: %v", ErrLivenessIntervalInvalid, err)
return fmt.Errorf("%w: %w", ErrLivenessIntervalInvalid, err)
}
if _, err := parseLivenessDuration(cfg.LivenessTimeout, control.DefaultTimeout); err != nil {
return fmt.Errorf("%w: %v", ErrLivenessTimeoutInvalid, err)
return fmt.Errorf("%w: %w", ErrLivenessTimeoutInvalid, err)
}
if cfg.LivenessFailures < 0 {
return ErrLivenessFailuresInvalid
@@ -514,10 +511,10 @@ func parseLivenessDuration(value string, def time.Duration) (time.Duration, erro
}
d, err := time.ParseDuration(value)
if err != nil {
return 0, err
return 0, fmt.Errorf("parse duration: %w", err)
}
if d <= 0 {
return 0, fmt.Errorf("duration must be > 0")
return 0, errPositiveDuration
}
return d, nil
}
@@ -525,11 +522,11 @@ func parseLivenessDuration(value string, def time.Duration) (time.Duration, erro
func livenessConfig(cfg Config) (control.Config, error) {
interval, err := parseLivenessDuration(cfg.LivenessInterval, control.DefaultInterval)
if err != nil {
return control.Config{}, fmt.Errorf("%w: %v", ErrLivenessIntervalInvalid, err)
return control.Config{}, fmt.Errorf("%w: %w", ErrLivenessIntervalInvalid, err)
}
timeout, err := parseLivenessDuration(cfg.LivenessTimeout, control.DefaultTimeout)
if err != nil {
return control.Config{}, fmt.Errorf("%w: %v", ErrLivenessTimeoutInvalid, err)
return control.Config{}, fmt.Errorf("%w: %w", ErrLivenessTimeoutInvalid, err)
}
failures := cfg.LivenessFailures
if failures == 0 {
@@ -547,7 +544,7 @@ func maxSessionDuration(cfg Config) (time.Duration, error) {
}
d, err := time.ParseDuration(cfg.MaxSessionDuration)
if err != nil {
return 0, fmt.Errorf("%w: %v", ErrLifecycleMaxSessionDurationInvalid, err)
return 0, fmt.Errorf("%w: %w", ErrLifecycleMaxSessionDurationInvalid, err)
}
if d <= 0 {
return 0, ErrLifecycleMaxSessionDurationInvalid
@@ -567,11 +564,11 @@ func trafficConfig(cfg Config) (transport.TrafficConfig, error) {
}
minDelay, err := parseOptionalNonNegativeDuration(cfg.TrafficMinDelay)
if err != nil {
return transport.TrafficConfig{}, fmt.Errorf("%w: %v", ErrTrafficMinDelayInvalid, err)
return transport.TrafficConfig{}, fmt.Errorf("%w: %w", ErrTrafficMinDelayInvalid, err)
}
maxDelay, err := parseOptionalNonNegativeDuration(cfg.TrafficMaxDelay)
if err != nil {
return transport.TrafficConfig{}, fmt.Errorf("%w: %v", ErrTrafficMaxDelayInvalid, err)
return transport.TrafficConfig{}, fmt.Errorf("%w: %w", ErrTrafficMaxDelayInvalid, err)
}
if maxDelay > 0 && maxDelay < minDelay {
return transport.TrafficConfig{}, ErrTrafficMaxDelayInvalid
@@ -589,10 +586,10 @@ func parseOptionalNonNegativeDuration(value string) (time.Duration, error) {
}
d, err := time.ParseDuration(value)
if err != nil {
return 0, err
return 0, fmt.Errorf("parse duration: %w", err)
}
if d < 0 {
return 0, fmt.Errorf("duration must be >= 0")
return 0, errNonNegativeDuration
}
return d, nil
}
@@ -740,7 +737,7 @@ func runWithSessionRotation(ctx context.Context, maxDuration time.Duration, run
cancel()
timer.Stop()
if ctx.Err() != nil {
return nil
return nil //nolint:nilerr // parent cancellation is normal shutdown for rotation
}
if rotated.Load() {
if err != nil {
@@ -748,7 +745,7 @@ func runWithSessionRotation(ctx context.Context, maxDuration time.Duration, run
}
logger.Infof("session rotation restarting: next_cycle=%d", currentCycle+1)
if err := waitSessionRestart(ctx); err != nil {
return nil
return nil //nolint:nilerr // canceled restart delay means normal shutdown
}
continue
}
@@ -757,7 +754,7 @@ func runWithSessionRotation(ctx context.Context, maxDuration time.Duration, run
}
logger.Infof("session ended cleanly with lifecycle rotation enabled: next_cycle=%d", currentCycle+1)
if err := waitSessionRestart(ctx); err != nil {
return nil
return nil //nolint:nilerr // canceled restart delay means normal shutdown
}
}
}
@@ -765,7 +762,7 @@ func runWithSessionRotation(ctx context.Context, maxDuration time.Duration, run
func waitSessionRestart(ctx context.Context) error {
select {
case <-ctx.Done():
return ctx.Err()
return fmt.Errorf("restart delay canceled: %w", ctx.Err())
case <-time.After(sessionRestartDelay):
return nil
}

View File

@@ -11,6 +11,8 @@ import (
"github.com/openlibrecommunity/olcrtc/internal/crypto"
)
const testBadDuration = "nope"
func TestApplyTransportDefaults(t *testing.T) {
tests := []struct {
name string
@@ -42,7 +44,7 @@ func TestApplyTransportDefaults(t *testing.T) {
VideoHeight: 1080,
VideoFPS: 30,
VideoBitrate: "2M",
VideoHW: "none",
VideoHW: defaultVideoHW,
VideoQRRecovery: "low",
VideoCodec: videoCodecQRCode,
},
@@ -56,7 +58,7 @@ func TestApplyTransportDefaults(t *testing.T) {
VideoHeight: 1080,
VideoFPS: 30,
VideoBitrate: "2M",
VideoHW: "none",
VideoHW: defaultVideoHW,
VideoQRRecovery: "low",
VideoCodec: videoCodecTile,
},
@@ -253,7 +255,7 @@ func TestValidate(t *testing.T) {
cfg.VideoHeight = 480
cfg.VideoFPS = 30
cfg.VideoBitrate = "1M"
cfg.VideoHW = "none" //nolint:goconst // test literal, repetition is intentional
cfg.VideoHW = defaultVideoHW
cfg.VideoCodec = "bogus"
return cfg
}(),
@@ -314,7 +316,7 @@ func TestValidate(t *testing.T) {
cfg.VideoHeight = 480
cfg.VideoFPS = 30
cfg.VideoBitrate = "1M"
cfg.VideoHW = "none"
cfg.VideoHW = defaultVideoHW
cfg.VideoCodec = "tile"
return cfg
}(),
@@ -329,7 +331,7 @@ func TestValidate(t *testing.T) {
cfg.VideoHeight = 1080
cfg.VideoFPS = 30
cfg.VideoBitrate = "1M"
cfg.VideoHW = "none"
cfg.VideoHW = defaultVideoHW
cfg.VideoCodec = "tile"
return cfg
}(),
@@ -474,7 +476,7 @@ func TestValidate(t *testing.T) {
name: "liveness rejects bad interval",
cfg: func() Config {
cfg := base
cfg.LivenessInterval = "nope"
cfg.LivenessInterval = testBadDuration
return cfg
}(),
want: ErrLivenessIntervalInvalid,
@@ -509,7 +511,7 @@ func TestValidate(t *testing.T) {
name: "lifecycle rejects bad max session duration",
cfg: func() Config {
cfg := base
cfg.MaxSessionDuration = "nope"
cfg.MaxSessionDuration = testBadDuration
return cfg
}(),
want: ErrLifecycleMaxSessionDurationInvalid,
@@ -555,7 +557,7 @@ func TestValidate(t *testing.T) {
name: "traffic rejects bad min delay",
cfg: func() Config {
cfg := base
cfg.TrafficMinDelay = "nope"
cfg.TrafficMinDelay = testBadDuration
return cfg
}(),
want: ErrTrafficMinDelayInvalid,

View File

@@ -120,7 +120,7 @@ func createMeeting(ctx context.Context, headers map[string]string) (*createRespo
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
return nil, protect.StatusError(errCreateRoomFailed, resp, 1024)
return nil, fmt.Errorf("create room status: %w", protect.StatusError(errCreateRoomFailed, resp, 1024))
}
var res createResponse
@@ -172,7 +172,7 @@ func preconnect(ctx context.Context, roomID, password string, headers map[string
defer func() { _ = preResp.Body.Close() }()
if preResp.StatusCode != http.StatusOK {
return "", protect.StatusError(errPreconnectFailed, preResp, 1024)
return "", fmt.Errorf("preconnect status: %w", protect.StatusError(errPreconnectFailed, preResp, 1024))
}
var preconnectResp struct {

View File

@@ -68,7 +68,7 @@ func GetConnectionInfo(ctx context.Context, roomURL, displayName string) (*Conne
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
return nil, protect.StatusError(ErrAPI, resp, 4096)
return nil, fmt.Errorf("telemost api status: %w", protect.StatusError(ErrAPI, resp, 4096))
}
var info ConnectionInfo

View File

@@ -83,7 +83,7 @@ func registerGuest(ctx context.Context, displayName string) (string, error) {
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
return "", protect.StatusError(errGuestRegister, resp, 4096)
return "", fmt.Errorf("guest register status: %w", protect.StatusError(errGuestRegister, resp, 4096))
}
var res guestRegisterResponse
@@ -120,7 +120,7 @@ func createRoom(ctx context.Context, accessToken string) (string, error) {
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
return "", protect.StatusError(errCreateRoom, resp, 4096)
return "", fmt.Errorf("create room status: %w", protect.StatusError(errCreateRoom, resp, 4096))
}
var res createRoomResponse
@@ -148,7 +148,7 @@ func joinRoom(ctx context.Context, accessToken, roomID string) error {
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
return protect.StatusError(errJoinRoom, resp, 4096)
return fmt.Errorf("join room status: %w", protect.StatusError(errJoinRoom, resp, 4096))
}
return nil
}
@@ -176,7 +176,7 @@ func getToken(ctx context.Context, accessToken, roomID, displayName string) (tok
defer func() { _ = resp.Body.Close() }()
if resp.StatusCode != http.StatusOK {
return tokenResponse{}, protect.StatusError(errGetToken, resp, 4096)
return tokenResponse{}, fmt.Errorf("get token status: %w", protect.StatusError(errGetToken, resp, 4096))
}
var res tokenResponse

View File

@@ -524,6 +524,7 @@ func TestShutdownClosesLinkAndConn(t *testing.T) {
}
}
//nolint:cyclop // integration-style control loop test needs setup and async assertions together
func TestStartControlLoopReportsPong(t *testing.T) {
a, b := net.Pipe()
defer func() {

View File

@@ -139,6 +139,7 @@ func TestApplyCLIWins(t *testing.T) {
}
}
//nolint:cyclop // profile merge fixture intentionally checks many mapped fields
func TestLoadAndApplyProfile(t *testing.T) {
dir := t.TempDir()
path := filepath.Join(dir, "olcrtc.yaml")

View File

@@ -160,7 +160,7 @@ func (s *state) readLoop(ctx context.Context) error {
raw, err := readFrame(s.rw)
if err != nil {
if ctx.Err() != nil {
return ctx.Err()
return fmt.Errorf("read loop canceled: %w", ctx.Err())
}
return err
}
@@ -177,7 +177,7 @@ func (s *state) readLoop(ctx context.Context) error {
SentUnixNano: msg.SentUnixNano,
}); err != nil {
if ctx.Err() != nil {
return ctx.Err()
return fmt.Errorf("read loop canceled: %w", ctx.Err())
}
return err
}
@@ -196,7 +196,7 @@ func (s *state) probeLoop(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
return fmt.Errorf("probe loop canceled: %w", ctx.Err())
case <-ticker.C:
if err := s.sendProbe(ctx); err != nil {
return err
@@ -270,7 +270,7 @@ func (s *state) handlePong(msg Message) {
func (s *state) enqueue(ctx context.Context, msg Message) error {
select {
case <-ctx.Done():
return ctx.Err()
return fmt.Errorf("enqueue canceled: %w", ctx.Err())
case s.out <- msg:
return nil
}
@@ -280,11 +280,11 @@ func (s *state) writeLoop(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
return fmt.Errorf("write loop canceled: %w", ctx.Err())
case msg := <-s.out:
if err := writeFrame(s.rw, msg); err != nil {
if ctx.Err() != nil {
return ctx.Err()
return fmt.Errorf("write loop canceled: %w", ctx.Err())
}
return err
}

View File

@@ -1218,13 +1218,14 @@ func TestSupervisorFailoverProfilesReachWorkingSOCKS(t *testing.T) {
var readyOnce sync.Once
clientErr := make(chan error, 1)
go func() {
clientErr <- supervisor.Run(ctx, failoverE2EConfig(clientProfiles, started, "client"), func(ctx context.Context, cfg session.Config) error {
runClientProfile := func(ctx context.Context, cfg session.Config) error {
return client.RunWithReady(ctx, clientConfigFromSession(cfg, socksAddr), func() {
if cfg.Auth == memoryCarrier {
readyOnce.Do(func() { close(ready) })
}
})
})
}
clientErr <- supervisor.Run(ctx, failoverE2EConfig(clientProfiles, started, "client"), runClientProfile)
}()
waitForReady(t, ready)

View File

@@ -46,8 +46,8 @@ var (
)
type roomHandle interface {
publishData([]byte) error
publishTrack(webrtc.TrackLocal) error
publishData(data []byte) error
publishTrack(track webrtc.TrackLocal) error
unpublishLocalTracks()
disconnect()
connectionState() lksdk.ConnectionState
@@ -58,16 +58,22 @@ type sdkRoom struct {
}
func (r *sdkRoom) publishData(data []byte) error {
return r.room.LocalParticipant.PublishDataPacket(
if err := r.room.LocalParticipant.PublishDataPacket(
lksdk.UserData(data),
lksdk.WithDataPublishTopic(dataPublishTopic),
lksdk.WithDataPublishReliable(true),
)
); err != nil {
return fmt.Errorf("publish data packet: %w", err)
}
return nil
}
func (r *sdkRoom) publishTrack(track webrtc.TrackLocal) error {
_, err := r.room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{Name: videoTrackName})
return err
if err != nil {
return fmt.Errorf("publish track: %w", err)
}
return nil
}
func (r *sdkRoom) unpublishLocalTracks() {
@@ -108,7 +114,7 @@ func connectSDKRoom(url, token string, callback *lksdk.RoomCallback) (roomHandle
lksdk.WithLogger(protoLogger.GetDiscardLogger()),
)
if err != nil {
return nil, err
return nil, fmt.Errorf("connect to livekit room: %w", err)
}
return &sdkRoom{room: room}, nil
}

View File

@@ -12,6 +12,13 @@ import (
"github.com/pion/webrtc/v4"
)
const (
testOldURL = "wss://old"
testOldToken = "old-token"
)
var errFakeConnect = errors.New("boom")
type fakeRoom struct {
mu sync.Mutex
state lksdk.ConnectionState
@@ -123,14 +130,15 @@ func waitFor(t *testing.T, cond func() bool) {
t.Fatal("condition was not met before timeout")
}
//nolint:cyclop // reconnect flow test keeps setup and postconditions in one scenario
func TestReconnectRefreshesCredentialsAndReplacesRoom(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
refreshes := 0
sess, err := New(ctx, engine.Config{
URL: "wss://old",
Token: "old-token",
URL: testOldURL,
Token: testOldToken,
Refresh: func(context.Context) (engine.Credentials, error) {
refreshes++
return engine.Credentials{URL: "wss://new", Token: "new-token"}, nil
@@ -139,7 +147,10 @@ func TestReconnectRefreshesCredentialsAndReplacesRoom(t *testing.T) {
if err != nil {
t.Fatalf("New() error = %v", err)
}
s := sess.(*Session)
s, ok := sess.(*Session)
if !ok {
t.Fatalf("New() type = %T, want *Session", sess)
}
connector := newFakeConnector()
s.connectRoom = connector.connect
@@ -163,10 +174,10 @@ func TestReconnectRefreshesCredentialsAndReplacesRoom(t *testing.T) {
}
urls, tokens := connector.snapshot()
if got, want := urls, []string{"wss://old", "wss://new"}; !equalStrings(got, want) {
if got, want := urls, []string{testOldURL, "wss://new"}; !equalStrings(got, want) {
t.Fatalf("connect urls = %v, want %v", got, want)
}
if got, want := tokens, []string{"old-token", "new-token"}; !equalStrings(got, want) {
if got, want := tokens, []string{testOldToken, "new-token"}; !equalStrings(got, want) {
t.Fatalf("connect tokens = %v, want %v", got, want)
}
if refreshes != 1 {
@@ -188,13 +199,17 @@ func TestReconnectRefreshesCredentialsAndReplacesRoom(t *testing.T) {
}
}
//nolint:cyclop // terminal disconnect test keeps setup and cleanup assertions together
func TestDisconnectedEndsWhenReconnectDisallowed(t *testing.T) {
ctx := context.Background()
sess, err := New(ctx, engine.Config{URL: "wss://old", Token: "old-token"})
sess, err := New(ctx, engine.Config{URL: testOldURL, Token: testOldToken})
if err != nil {
t.Fatalf("New() error = %v", err)
}
s := sess.(*Session)
s, ok := sess.(*Session)
if !ok {
t.Fatalf("New() type = %T, want *Session", sess)
}
connector := newFakeConnector()
s.connectRoom = connector.connect
s.SetShouldReconnect(func() bool { return false })
@@ -264,7 +279,7 @@ func TestCanSendRequiresConnectedRoomAndQueueHeadroom(t *testing.T) {
t.Fatal("CanSend() = false for connected room")
}
for i := 0; i < defaultSendQueueCapHard; i++ {
for range defaultSendQueueCapHard {
s.sendQueue <- []byte("x")
}
if s.CanSend() {
@@ -277,11 +292,11 @@ func TestReconnectFailureRetriesUntilContextDone(t *testing.T) {
defer cancel()
s := &Session{
url: "wss://old",
token: "old-token",
url: testOldURL,
token: testOldToken,
connectRoom: func(string, string, *lksdk.RoomCallback) (roomHandle, error) {
cancel()
return nil, errors.New("boom")
return nil, errFakeConnect
},
reconnectCh: make(chan struct{}, 1),
closeCh: make(chan struct{}),

View File

@@ -114,7 +114,11 @@ func TestNewForwardsConfigAndMethods(t *testing.T) {
if !ln.CanSend() {
t.Fatal("CanSend() = false, want true")
}
if features := ln.(link.FeaturesProvider).Features(); features.MaxPayloadSize != 4096 {
provider, ok := ln.(link.FeaturesProvider)
if !ok {
t.Fatalf("New() type = %T, want link.FeaturesProvider", ln)
}
if features := provider.Features(); features.MaxPayloadSize != 4096 {
t.Fatalf("Features() = %+v, want shaped max payload 4096", features)
}
}

View File

@@ -33,7 +33,7 @@ var (
`[^",\s}]+`,
)
sensitiveBearerRE = regexp.MustCompile(`(?i)(bearer\s+)[A-Za-z0-9._~+/=-]+`)
) //nolint:gochecknoglobals // compiled once for provider error redaction
)
// Protector is called with a socket file descriptor before connect.
// On Android, this calls VpnService.protect(fd) to bypass VPN routing.

View File

@@ -379,6 +379,7 @@ func TestReinstallSessionFiresOnClose(t *testing.T) {
}
}
//nolint:cyclop // integration-style control loop test needs setup and async assertions together
func TestStartControlLoopReportsPong(t *testing.T) {
a, b := net.Pipe()
defer func() {

View File

@@ -10,7 +10,10 @@ import (
"github.com/openlibrecommunity/olcrtc/internal/app/session"
)
// DefaultRetryDelay is used between profile attempts when Config.RetryDelay is unset.
const DefaultRetryDelay = 2 * time.Second
// DefaultHistoryLimit bounds emitted status history when Config.HistoryLimit is unset.
const DefaultHistoryLimit = 20
const (
@@ -25,6 +28,7 @@ var (
ErrNoProfiles = errors.New("supervisor: no profiles configured")
// ErrMaxCyclesExceeded is returned after MaxCycles complete profile-list passes.
ErrMaxCyclesExceeded = errors.New("supervisor: max failover cycles exceeded")
errProfileCleanEnd = errors.New("profile ended")
)
// Profile is one runnable session configuration in an ordered failover list.
@@ -91,39 +95,72 @@ func Run(ctx context.Context, cfg Config, run Runner) error {
var lastErr error
for cycle := 1; ; cycle++ {
for i, profile := range cfg.Profiles {
if ctx.Err() != nil {
return nil
}
state.start(i, cycle)
if cfg.OnProfileStart != nil {
cfg.OnProfileStart(profile, cycle)
}
err := run(ctx, profile.Config)
if ctx.Err() != nil {
return nil
}
if err != nil {
lastErr = fmt.Errorf("profile %q: %w", profile.Name, err)
} else {
lastErr = fmt.Errorf("profile %q ended", profile.Name)
}
state.end(i, cycle, err)
if cfg.OnProfileEnd != nil {
cfg.OnProfileEnd(profile, cycle, err)
}
if cfg.MaxCycles > 0 && cycle >= cfg.MaxCycles && i == len(cfg.Profiles)-1 {
return fmt.Errorf("%w after %d cycle(s): %w", ErrMaxCyclesExceeded, cycle, lastErr)
}
if err := waitRetryDelay(ctx, cfg.RetryDelay); err != nil {
return nil
}
if err := runCycle(ctx, cfg, run, state, cycle, &lastErr); err != nil {
return err
}
}
}
func runCycle(
ctx context.Context,
cfg Config,
run Runner,
state *statusTracker,
cycle int,
lastErr *error,
) error {
for i, profile := range cfg.Profiles {
if err := runProfile(ctx, cfg, run, state, cycle, i, profile, lastErr); err != nil {
return err
}
}
return nil
}
func runProfile(
ctx context.Context,
cfg Config,
run Runner,
state *statusTracker,
cycle int,
profileIndex int,
profile Profile,
lastErr *error,
) error {
if ctx.Err() != nil {
return nil //nolint:nilerr // context cancellation is normal supervisor shutdown
}
state.start(profileIndex, cycle)
if cfg.OnProfileStart != nil {
cfg.OnProfileStart(profile, cycle)
}
err := run(ctx, profile.Config)
if ctx.Err() != nil {
return nil //nolint:nilerr // context cancellation is normal supervisor shutdown
}
*lastErr = profileResultError(profile.Name, err)
state.end(profileIndex, cycle, err)
if cfg.OnProfileEnd != nil {
cfg.OnProfileEnd(profile, cycle, err)
}
if cfg.MaxCycles > 0 && cycle >= cfg.MaxCycles && profileIndex == len(cfg.Profiles)-1 {
return fmt.Errorf("%w after %d cycle(s): %w", ErrMaxCyclesExceeded, cycle, *lastErr)
}
if err := waitRetryDelay(ctx, cfg.RetryDelay); err != nil {
return nil //nolint:nilerr // context cancellation during retry delay is normal shutdown
}
return nil
}
func profileResultError(name string, err error) error {
if err != nil {
return fmt.Errorf("profile %q: %w", name, err)
}
return fmt.Errorf("profile %q: %w", name, errProfileCleanEnd)
}
type statusTracker struct {
status Status
notify func(Status)
@@ -222,7 +259,7 @@ func waitRetryDelay(ctx context.Context, delay time.Duration) error {
defer timer.Stop()
select {
case <-ctx.Done():
return ctx.Err()
return fmt.Errorf("retry delay canceled: %w", ctx.Err())
case <-timer.C:
return nil
}

View File

@@ -11,6 +11,12 @@ import (
var errRunnerBoom = errors.New("boom")
const (
testProfileFirst = "first"
testProfileSecond = "second"
testProfileOne = "one"
)
func TestRunRequiresProfiles(t *testing.T) {
err := Run(context.Background(), Config{}, func(context.Context, session.Config) error { return nil })
if !errors.Is(err, ErrNoProfiles) {
@@ -20,8 +26,8 @@ func TestRunRequiresProfiles(t *testing.T) {
func TestRunAdvancesProfilesAndStopsAtMaxCycles(t *testing.T) {
profiles := []Profile{
{Name: "first", Config: session.Config{Auth: "wbstream"}},
{Name: "second", Config: session.Config{Auth: "jitsi"}},
{Name: testProfileFirst, Config: session.Config{Auth: "wbstream"}},
{Name: testProfileSecond, Config: session.Config{Auth: "jitsi"}},
}
var started []string
var ended []string
@@ -50,18 +56,19 @@ func TestRunAdvancesProfilesAndStopsAtMaxCycles(t *testing.T) {
if !errors.Is(err, ErrMaxCyclesExceeded) {
t.Fatalf("Run() error = %v, want %v", err, ErrMaxCyclesExceeded)
}
if got, want := started, []string{"first", "second"}; !equalStrings(got, want) {
if got, want := started, []string{testProfileFirst, testProfileSecond}; !equalStrings(got, want) {
t.Fatalf("started = %v, want %v", got, want)
}
if got, want := ended, []string{"first", "second"}; !equalStrings(got, want) {
if got, want := ended, []string{testProfileFirst, testProfileSecond}; !equalStrings(got, want) {
t.Fatalf("ended = %v, want %v", got, want)
}
}
//nolint:cyclop // status history test verifies one complete failover cycle
func TestRunEmitsStatusHistory(t *testing.T) {
profiles := []Profile{
{Name: "first", Config: session.Config{Auth: "wbstream"}},
{Name: "second", Config: session.Config{Auth: "jitsi"}},
{Name: testProfileFirst, Config: session.Config{Auth: "wbstream"}},
{Name: testProfileSecond, Config: session.Config{Auth: "jitsi"}},
}
var snapshots []Status
err := Run(context.Background(), Config{
@@ -73,7 +80,7 @@ func TestRunEmitsStatusHistory(t *testing.T) {
snapshots = append(snapshots, status)
},
}, func(_ context.Context, cfg session.Config) error {
if cfg.Auth == "first" {
if cfg.Auth == testProfileFirst {
t.Fatal("runner received profile name instead of config")
}
return errRunnerBoom
@@ -85,7 +92,7 @@ func TestRunEmitsStatusHistory(t *testing.T) {
t.Fatalf("status snapshots = %d, want 4", len(snapshots))
}
first := snapshots[0]
if first.ActiveProfile != "first" || first.ActiveProfileIndex != 0 || first.Cycle != 1 {
if first.ActiveProfile != testProfileFirst || first.ActiveProfileIndex != 0 || first.Cycle != 1 {
t.Fatalf("first status = %+v", first)
}
if first.Profiles[0].Starts != 1 || first.Profiles[0].LastStarted.IsZero() {
@@ -104,10 +111,10 @@ func TestRunEmitsStatusHistory(t *testing.T) {
if len(last.History) != 3 {
t.Fatalf("history length = %d, want 3", len(last.History))
}
if last.History[0].Type != EventProfileEnd || last.History[0].Profile != "first" {
if last.History[0].Type != EventProfileEnd || last.History[0].Profile != testProfileFirst {
t.Fatalf("oldest bounded history event = %+v", last.History[0])
}
if last.History[2].Type != EventProfileEnd || last.History[2].Profile != "second" ||
if last.History[2].Type != EventProfileEnd || last.History[2].Profile != testProfileSecond ||
last.History[2].Error == "" {
t.Fatalf("last history event = %+v", last.History[2])
}
@@ -117,7 +124,7 @@ func TestRunStatusSnapshotIsImmutable(t *testing.T) {
var first Status
var second Status
err := Run(context.Background(), Config{
Profiles: []Profile{{Name: "one"}},
Profiles: []Profile{{Name: testProfileOne}},
RetryDelay: -1,
MaxCycles: 1,
OnStatus: func(status Status) {
@@ -138,7 +145,7 @@ func TestRunStatusSnapshotIsImmutable(t *testing.T) {
if first.Profiles[0].Starts != 99 || first.History[0].Profile != "mutated" {
t.Fatalf("test mutation did not apply to snapshot: %+v", first)
}
if second.Profiles[0].Starts != 1 || second.History[0].Profile != "one" {
if second.Profiles[0].Starts != 1 || second.History[0].Profile != testProfileOne {
t.Fatalf("snapshot mutation leaked into later status: %+v", second)
}
}
@@ -146,7 +153,7 @@ func TestRunStatusSnapshotIsImmutable(t *testing.T) {
func TestRunReturnsNilOnContextCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
err := Run(ctx, Config{
Profiles: []Profile{{Name: "one"}},
Profiles: []Profile{{Name: testProfileOne}},
RetryDelay: time.Hour,
}, func(context.Context, session.Config) error {
cancel()

View File

@@ -9,8 +9,15 @@ import (
"time"
)
// ErrTrafficPayloadTooLarge is returned when Send receives a payload above the configured cap.
var ErrTrafficPayloadTooLarge = errors.New("traffic payload exceeds max_payload_size")
var (
errTrafficConnect = errors.New("traffic connect failed")
errTrafficSend = errors.New("traffic send failed")
errTrafficClose = errors.New("traffic close failed")
)
type trafficTransport struct {
inner Transport
maxPayloadSize int
@@ -43,7 +50,12 @@ func effectiveTrafficConfig(features Features, cfg TrafficConfig) TrafficConfig
return cfg
}
func (t *trafficTransport) Connect(ctx context.Context) error { return t.inner.Connect(ctx) }
func (t *trafficTransport) Connect(ctx context.Context) error {
if err := t.inner.Connect(ctx); err != nil {
return fmt.Errorf("%w: %w", errTrafficConnect, err)
}
return nil
}
func (t *trafficTransport) Send(data []byte) error {
t.sendMu.Lock()
@@ -54,10 +66,18 @@ func (t *trafficTransport) Send(data []byte) error {
if delay := t.nextDelay(); delay > 0 {
time.Sleep(delay)
}
return t.inner.Send(data)
if err := t.inner.Send(data); err != nil {
return fmt.Errorf("%w: %w", errTrafficSend, err)
}
return nil
}
func (t *trafficTransport) Close() error { return t.inner.Close() }
func (t *trafficTransport) Close() error {
if err := t.inner.Close(); err != nil {
return fmt.Errorf("%w: %w", errTrafficClose, err)
}
return nil
}
func (t *trafficTransport) SetReconnectCallback(cb func()) { t.inner.SetReconnectCallback(cb) }

View File

@@ -77,6 +77,7 @@ func TestProtectorAndLogging(t *testing.T) {
}
}
//nolint:cyclop // compact setter smoke test verifies several related defaults together
func TestDefaultsAndSetters(t *testing.T) {
resetMobileGlobals(t)
@@ -182,7 +183,8 @@ func TestStartWithInjectedRunnerLifecycle(t *testing.T) {
cfg.Liveness.Timeout != 750*time.Millisecond ||
cfg.Liveness.Failures != 4 {
t.Fatalf(
"RunWithReady args mismatch: link=%q transport=%q carrier=%q room=%q client=%q local=%q dns=%q vp8=%d/%d liveness=%+v",
"RunWithReady args mismatch: link=%q transport=%q carrier=%q room=%q client=%q "+
"local=%q dns=%q vp8=%d/%d liveness=%+v",
cfg.Link, cfg.Transport, cfg.Carrier, cfg.RoomURL, cfg.DeviceID,
cfg.LocalAddr, cfg.DNSServer, cfg.VP8FPS, cfg.VP8BatchSize, cfg.Liveness,
)