From 6222896921cb361348cdbb7313f07fda8d24868f Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Sat, 16 May 2026 04:06:55 +0300 Subject: [PATCH] refactor: improve error context and test clarity --- cmd/olcrtc/main_test.go | 4 +- internal/app/session/session.go | 55 +++++++------- internal/app/session/session_test.go | 18 +++-- internal/auth/salutejazz/api.go | 4 +- internal/auth/telemost/api.go | 2 +- internal/auth/wbstream/api.go | 8 +- internal/client/client_test.go | 1 + internal/config/config_test.go | 1 + internal/control/control.go | 12 +-- internal/e2e/tunnel_test.go | 5 +- internal/engine/livekit/livekit.go | 18 +++-- internal/engine/livekit/livekit_test.go | 37 +++++++--- internal/link/direct/direct_test.go | 6 +- internal/protect/protect.go | 2 +- internal/server/server_test.go | 1 + internal/supervisor/supervisor.go | 97 +++++++++++++++++-------- internal/supervisor/supervisor_test.go | 33 +++++---- internal/transport/traffic.go | 26 ++++++- mobile/mobile_test.go | 4 +- 19 files changed, 214 insertions(+), 120 deletions(-) diff --git a/cmd/olcrtc/main_test.go b/cmd/olcrtc/main_test.go index 96a4aeb..465b13b 100644 --- a/cmd/olcrtc/main_test.go +++ b/cmd/olcrtc/main_test.go @@ -161,7 +161,7 @@ func TestRunWithArgsAppliesTransportDefaults(t *testing.T) { oldRunSession := runSession t.Cleanup(func() { runSession = oldRunSession }) - runSession = func(ctx context.Context, cfg session.Config) error { + runSession = func(_ context.Context, cfg session.Config) error { if cfg.VP8FPS != 25 || cfg.VP8BatchSize != 1 { t.Fatalf("VP8 defaults = fps %d batch %d, want 25/1", cfg.VP8FPS, cfg.VP8BatchSize) } @@ -200,7 +200,7 @@ func TestRunWithArgsFailoverProfiles(t *testing.T) { oldRunSession := runSession t.Cleanup(func() { runSession = oldRunSession }) var seen []string - runSession = func(ctx context.Context, cfg session.Config) error { + runSession = func(_ context.Context, cfg session.Config) error { seen = append(seen, cfg.Auth+"/"+cfg.Transport) if cfg.Auth == "wbstream" && (cfg.VP8FPS != 25 || cfg.VP8BatchSize != 1) { t.Fatalf("VP8 defaults = fps %d batch %d, want 25/1", cfg.VP8FPS, cfg.VP8BatchSize) diff --git a/internal/app/session/session.go b/internal/app/session/session.go index 8df7b65..4925e05 100644 --- a/internal/app/session/session.go +++ b/internal/app/session/session.go @@ -56,7 +56,7 @@ const ( defaultSEIAckTimeoutMS = 2000 ) -var sessionRestartDelay = 2 * time.Second +var sessionRestartDelay = 2 * time.Second //nolint:gochecknoglobals // tests shorten lifecycle rotation delay var ( // ErrRoomIDRequired indicates that a room id is required for the selected carrier. @@ -147,6 +147,8 @@ var ( // ErrTrafficMaxDelayInvalid indicates that traffic.max_delay is not a non-negative duration. ErrTrafficMaxDelayInvalid = errors.New( "invalid traffic max delay (set traffic.max_delay to a duration >= 0 and >= traffic.min_delay)") + errPositiveDuration = errors.New("duration must be > 0") + errNonNegativeDuration = errors.New("duration must be >= 0") ) // Config holds runtime session settings. @@ -264,20 +266,15 @@ func applyVideoDefaults(cfg Config) Config { if cfg.VideoCodec == "" { cfg.VideoCodec = videoCodecQRCode } + width := defaultVideoWidth if cfg.VideoCodec == videoCodecTile { - if cfg.VideoWidth == 0 { - cfg.VideoWidth = 1080 - } - if cfg.VideoHeight == 0 { - cfg.VideoHeight = 1080 - } - } else { - if cfg.VideoWidth == 0 { - cfg.VideoWidth = defaultVideoWidth - } - if cfg.VideoHeight == 0 { - cfg.VideoHeight = defaultVideoHeight - } + width = defaultVideoHeight + } + if cfg.VideoWidth == 0 { + cfg.VideoWidth = width + } + if cfg.VideoHeight == 0 { + cfg.VideoHeight = defaultVideoHeight } if cfg.VideoFPS == 0 { cfg.VideoFPS = defaultVideoFPS @@ -490,10 +487,10 @@ func validateModeConfig(cfg Config) error { func validateLivenessConfig(cfg Config) error { if _, err := parseLivenessDuration(cfg.LivenessInterval, control.DefaultInterval); err != nil { - return fmt.Errorf("%w: %v", ErrLivenessIntervalInvalid, err) + return fmt.Errorf("%w: %w", ErrLivenessIntervalInvalid, err) } if _, err := parseLivenessDuration(cfg.LivenessTimeout, control.DefaultTimeout); err != nil { - return fmt.Errorf("%w: %v", ErrLivenessTimeoutInvalid, err) + return fmt.Errorf("%w: %w", ErrLivenessTimeoutInvalid, err) } if cfg.LivenessFailures < 0 { return ErrLivenessFailuresInvalid @@ -514,10 +511,10 @@ func parseLivenessDuration(value string, def time.Duration) (time.Duration, erro } d, err := time.ParseDuration(value) if err != nil { - return 0, err + return 0, fmt.Errorf("parse duration: %w", err) } if d <= 0 { - return 0, fmt.Errorf("duration must be > 0") + return 0, errPositiveDuration } return d, nil } @@ -525,11 +522,11 @@ func parseLivenessDuration(value string, def time.Duration) (time.Duration, erro func livenessConfig(cfg Config) (control.Config, error) { interval, err := parseLivenessDuration(cfg.LivenessInterval, control.DefaultInterval) if err != nil { - return control.Config{}, fmt.Errorf("%w: %v", ErrLivenessIntervalInvalid, err) + return control.Config{}, fmt.Errorf("%w: %w", ErrLivenessIntervalInvalid, err) } timeout, err := parseLivenessDuration(cfg.LivenessTimeout, control.DefaultTimeout) if err != nil { - return control.Config{}, fmt.Errorf("%w: %v", ErrLivenessTimeoutInvalid, err) + return control.Config{}, fmt.Errorf("%w: %w", ErrLivenessTimeoutInvalid, err) } failures := cfg.LivenessFailures if failures == 0 { @@ -547,7 +544,7 @@ func maxSessionDuration(cfg Config) (time.Duration, error) { } d, err := time.ParseDuration(cfg.MaxSessionDuration) if err != nil { - return 0, fmt.Errorf("%w: %v", ErrLifecycleMaxSessionDurationInvalid, err) + return 0, fmt.Errorf("%w: %w", ErrLifecycleMaxSessionDurationInvalid, err) } if d <= 0 { return 0, ErrLifecycleMaxSessionDurationInvalid @@ -567,11 +564,11 @@ func trafficConfig(cfg Config) (transport.TrafficConfig, error) { } minDelay, err := parseOptionalNonNegativeDuration(cfg.TrafficMinDelay) if err != nil { - return transport.TrafficConfig{}, fmt.Errorf("%w: %v", ErrTrafficMinDelayInvalid, err) + return transport.TrafficConfig{}, fmt.Errorf("%w: %w", ErrTrafficMinDelayInvalid, err) } maxDelay, err := parseOptionalNonNegativeDuration(cfg.TrafficMaxDelay) if err != nil { - return transport.TrafficConfig{}, fmt.Errorf("%w: %v", ErrTrafficMaxDelayInvalid, err) + return transport.TrafficConfig{}, fmt.Errorf("%w: %w", ErrTrafficMaxDelayInvalid, err) } if maxDelay > 0 && maxDelay < minDelay { return transport.TrafficConfig{}, ErrTrafficMaxDelayInvalid @@ -589,10 +586,10 @@ func parseOptionalNonNegativeDuration(value string) (time.Duration, error) { } d, err := time.ParseDuration(value) if err != nil { - return 0, err + return 0, fmt.Errorf("parse duration: %w", err) } if d < 0 { - return 0, fmt.Errorf("duration must be >= 0") + return 0, errNonNegativeDuration } return d, nil } @@ -740,7 +737,7 @@ func runWithSessionRotation(ctx context.Context, maxDuration time.Duration, run cancel() timer.Stop() if ctx.Err() != nil { - return nil + return nil //nolint:nilerr // parent cancellation is normal shutdown for rotation } if rotated.Load() { if err != nil { @@ -748,7 +745,7 @@ func runWithSessionRotation(ctx context.Context, maxDuration time.Duration, run } logger.Infof("session rotation restarting: next_cycle=%d", currentCycle+1) if err := waitSessionRestart(ctx); err != nil { - return nil + return nil //nolint:nilerr // canceled restart delay means normal shutdown } continue } @@ -757,7 +754,7 @@ func runWithSessionRotation(ctx context.Context, maxDuration time.Duration, run } logger.Infof("session ended cleanly with lifecycle rotation enabled: next_cycle=%d", currentCycle+1) if err := waitSessionRestart(ctx); err != nil { - return nil + return nil //nolint:nilerr // canceled restart delay means normal shutdown } } } @@ -765,7 +762,7 @@ func runWithSessionRotation(ctx context.Context, maxDuration time.Duration, run func waitSessionRestart(ctx context.Context) error { select { case <-ctx.Done(): - return ctx.Err() + return fmt.Errorf("restart delay canceled: %w", ctx.Err()) case <-time.After(sessionRestartDelay): return nil } diff --git a/internal/app/session/session_test.go b/internal/app/session/session_test.go index d75371b..c2581f6 100644 --- a/internal/app/session/session_test.go +++ b/internal/app/session/session_test.go @@ -11,6 +11,8 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/crypto" ) +const testBadDuration = "nope" + func TestApplyTransportDefaults(t *testing.T) { tests := []struct { name string @@ -42,7 +44,7 @@ func TestApplyTransportDefaults(t *testing.T) { VideoHeight: 1080, VideoFPS: 30, VideoBitrate: "2M", - VideoHW: "none", + VideoHW: defaultVideoHW, VideoQRRecovery: "low", VideoCodec: videoCodecQRCode, }, @@ -56,7 +58,7 @@ func TestApplyTransportDefaults(t *testing.T) { VideoHeight: 1080, VideoFPS: 30, VideoBitrate: "2M", - VideoHW: "none", + VideoHW: defaultVideoHW, VideoQRRecovery: "low", VideoCodec: videoCodecTile, }, @@ -253,7 +255,7 @@ func TestValidate(t *testing.T) { cfg.VideoHeight = 480 cfg.VideoFPS = 30 cfg.VideoBitrate = "1M" - cfg.VideoHW = "none" //nolint:goconst // test literal, repetition is intentional + cfg.VideoHW = defaultVideoHW cfg.VideoCodec = "bogus" return cfg }(), @@ -314,7 +316,7 @@ func TestValidate(t *testing.T) { cfg.VideoHeight = 480 cfg.VideoFPS = 30 cfg.VideoBitrate = "1M" - cfg.VideoHW = "none" + cfg.VideoHW = defaultVideoHW cfg.VideoCodec = "tile" return cfg }(), @@ -329,7 +331,7 @@ func TestValidate(t *testing.T) { cfg.VideoHeight = 1080 cfg.VideoFPS = 30 cfg.VideoBitrate = "1M" - cfg.VideoHW = "none" + cfg.VideoHW = defaultVideoHW cfg.VideoCodec = "tile" return cfg }(), @@ -474,7 +476,7 @@ func TestValidate(t *testing.T) { name: "liveness rejects bad interval", cfg: func() Config { cfg := base - cfg.LivenessInterval = "nope" + cfg.LivenessInterval = testBadDuration return cfg }(), want: ErrLivenessIntervalInvalid, @@ -509,7 +511,7 @@ func TestValidate(t *testing.T) { name: "lifecycle rejects bad max session duration", cfg: func() Config { cfg := base - cfg.MaxSessionDuration = "nope" + cfg.MaxSessionDuration = testBadDuration return cfg }(), want: ErrLifecycleMaxSessionDurationInvalid, @@ -555,7 +557,7 @@ func TestValidate(t *testing.T) { name: "traffic rejects bad min delay", cfg: func() Config { cfg := base - cfg.TrafficMinDelay = "nope" + cfg.TrafficMinDelay = testBadDuration return cfg }(), want: ErrTrafficMinDelayInvalid, diff --git a/internal/auth/salutejazz/api.go b/internal/auth/salutejazz/api.go index 40cd092..1137b06 100644 --- a/internal/auth/salutejazz/api.go +++ b/internal/auth/salutejazz/api.go @@ -120,7 +120,7 @@ func createMeeting(ctx context.Context, headers map[string]string) (*createRespo defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { - return nil, protect.StatusError(errCreateRoomFailed, resp, 1024) + return nil, fmt.Errorf("create room status: %w", protect.StatusError(errCreateRoomFailed, resp, 1024)) } var res createResponse @@ -172,7 +172,7 @@ func preconnect(ctx context.Context, roomID, password string, headers map[string defer func() { _ = preResp.Body.Close() }() if preResp.StatusCode != http.StatusOK { - return "", protect.StatusError(errPreconnectFailed, preResp, 1024) + return "", fmt.Errorf("preconnect status: %w", protect.StatusError(errPreconnectFailed, preResp, 1024)) } var preconnectResp struct { diff --git a/internal/auth/telemost/api.go b/internal/auth/telemost/api.go index a9b1116..7babca1 100644 --- a/internal/auth/telemost/api.go +++ b/internal/auth/telemost/api.go @@ -68,7 +68,7 @@ func GetConnectionInfo(ctx context.Context, roomURL, displayName string) (*Conne defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { - return nil, protect.StatusError(ErrAPI, resp, 4096) + return nil, fmt.Errorf("telemost api status: %w", protect.StatusError(ErrAPI, resp, 4096)) } var info ConnectionInfo diff --git a/internal/auth/wbstream/api.go b/internal/auth/wbstream/api.go index ea1a927..9e0a74a 100644 --- a/internal/auth/wbstream/api.go +++ b/internal/auth/wbstream/api.go @@ -83,7 +83,7 @@ func registerGuest(ctx context.Context, displayName string) (string, error) { defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { - return "", protect.StatusError(errGuestRegister, resp, 4096) + return "", fmt.Errorf("guest register status: %w", protect.StatusError(errGuestRegister, resp, 4096)) } var res guestRegisterResponse @@ -120,7 +120,7 @@ func createRoom(ctx context.Context, accessToken string) (string, error) { defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { - return "", protect.StatusError(errCreateRoom, resp, 4096) + return "", fmt.Errorf("create room status: %w", protect.StatusError(errCreateRoom, resp, 4096)) } var res createRoomResponse @@ -148,7 +148,7 @@ func joinRoom(ctx context.Context, accessToken, roomID string) error { defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { - return protect.StatusError(errJoinRoom, resp, 4096) + return fmt.Errorf("join room status: %w", protect.StatusError(errJoinRoom, resp, 4096)) } return nil } @@ -176,7 +176,7 @@ func getToken(ctx context.Context, accessToken, roomID, displayName string) (tok defer func() { _ = resp.Body.Close() }() if resp.StatusCode != http.StatusOK { - return tokenResponse{}, protect.StatusError(errGetToken, resp, 4096) + return tokenResponse{}, fmt.Errorf("get token status: %w", protect.StatusError(errGetToken, resp, 4096)) } var res tokenResponse diff --git a/internal/client/client_test.go b/internal/client/client_test.go index 40b3c22..9f624f8 100644 --- a/internal/client/client_test.go +++ b/internal/client/client_test.go @@ -524,6 +524,7 @@ func TestShutdownClosesLinkAndConn(t *testing.T) { } } +//nolint:cyclop // integration-style control loop test needs setup and async assertions together func TestStartControlLoopReportsPong(t *testing.T) { a, b := net.Pipe() defer func() { diff --git a/internal/config/config_test.go b/internal/config/config_test.go index c699283..cd6d871 100644 --- a/internal/config/config_test.go +++ b/internal/config/config_test.go @@ -139,6 +139,7 @@ func TestApplyCLIWins(t *testing.T) { } } +//nolint:cyclop // profile merge fixture intentionally checks many mapped fields func TestLoadAndApplyProfile(t *testing.T) { dir := t.TempDir() path := filepath.Join(dir, "olcrtc.yaml") diff --git a/internal/control/control.go b/internal/control/control.go index d799518..7d82f04 100644 --- a/internal/control/control.go +++ b/internal/control/control.go @@ -160,7 +160,7 @@ func (s *state) readLoop(ctx context.Context) error { raw, err := readFrame(s.rw) if err != nil { if ctx.Err() != nil { - return ctx.Err() + return fmt.Errorf("read loop canceled: %w", ctx.Err()) } return err } @@ -177,7 +177,7 @@ func (s *state) readLoop(ctx context.Context) error { SentUnixNano: msg.SentUnixNano, }); err != nil { if ctx.Err() != nil { - return ctx.Err() + return fmt.Errorf("read loop canceled: %w", ctx.Err()) } return err } @@ -196,7 +196,7 @@ func (s *state) probeLoop(ctx context.Context) error { for { select { case <-ctx.Done(): - return ctx.Err() + return fmt.Errorf("probe loop canceled: %w", ctx.Err()) case <-ticker.C: if err := s.sendProbe(ctx); err != nil { return err @@ -270,7 +270,7 @@ func (s *state) handlePong(msg Message) { func (s *state) enqueue(ctx context.Context, msg Message) error { select { case <-ctx.Done(): - return ctx.Err() + return fmt.Errorf("enqueue canceled: %w", ctx.Err()) case s.out <- msg: return nil } @@ -280,11 +280,11 @@ func (s *state) writeLoop(ctx context.Context) error { for { select { case <-ctx.Done(): - return ctx.Err() + return fmt.Errorf("write loop canceled: %w", ctx.Err()) case msg := <-s.out: if err := writeFrame(s.rw, msg); err != nil { if ctx.Err() != nil { - return ctx.Err() + return fmt.Errorf("write loop canceled: %w", ctx.Err()) } return err } diff --git a/internal/e2e/tunnel_test.go b/internal/e2e/tunnel_test.go index deb9f44..46fcf57 100644 --- a/internal/e2e/tunnel_test.go +++ b/internal/e2e/tunnel_test.go @@ -1218,13 +1218,14 @@ func TestSupervisorFailoverProfilesReachWorkingSOCKS(t *testing.T) { var readyOnce sync.Once clientErr := make(chan error, 1) go func() { - clientErr <- supervisor.Run(ctx, failoverE2EConfig(clientProfiles, started, "client"), func(ctx context.Context, cfg session.Config) error { + runClientProfile := func(ctx context.Context, cfg session.Config) error { return client.RunWithReady(ctx, clientConfigFromSession(cfg, socksAddr), func() { if cfg.Auth == memoryCarrier { readyOnce.Do(func() { close(ready) }) } }) - }) + } + clientErr <- supervisor.Run(ctx, failoverE2EConfig(clientProfiles, started, "client"), runClientProfile) }() waitForReady(t, ready) diff --git a/internal/engine/livekit/livekit.go b/internal/engine/livekit/livekit.go index ad7e64d..80b4aab 100644 --- a/internal/engine/livekit/livekit.go +++ b/internal/engine/livekit/livekit.go @@ -46,8 +46,8 @@ var ( ) type roomHandle interface { - publishData([]byte) error - publishTrack(webrtc.TrackLocal) error + publishData(data []byte) error + publishTrack(track webrtc.TrackLocal) error unpublishLocalTracks() disconnect() connectionState() lksdk.ConnectionState @@ -58,16 +58,22 @@ type sdkRoom struct { } func (r *sdkRoom) publishData(data []byte) error { - return r.room.LocalParticipant.PublishDataPacket( + if err := r.room.LocalParticipant.PublishDataPacket( lksdk.UserData(data), lksdk.WithDataPublishTopic(dataPublishTopic), lksdk.WithDataPublishReliable(true), - ) + ); err != nil { + return fmt.Errorf("publish data packet: %w", err) + } + return nil } func (r *sdkRoom) publishTrack(track webrtc.TrackLocal) error { _, err := r.room.LocalParticipant.PublishTrack(track, &lksdk.TrackPublicationOptions{Name: videoTrackName}) - return err + if err != nil { + return fmt.Errorf("publish track: %w", err) + } + return nil } func (r *sdkRoom) unpublishLocalTracks() { @@ -108,7 +114,7 @@ func connectSDKRoom(url, token string, callback *lksdk.RoomCallback) (roomHandle lksdk.WithLogger(protoLogger.GetDiscardLogger()), ) if err != nil { - return nil, err + return nil, fmt.Errorf("connect to livekit room: %w", err) } return &sdkRoom{room: room}, nil } diff --git a/internal/engine/livekit/livekit_test.go b/internal/engine/livekit/livekit_test.go index 7a46fd5..9f30431 100644 --- a/internal/engine/livekit/livekit_test.go +++ b/internal/engine/livekit/livekit_test.go @@ -12,6 +12,13 @@ import ( "github.com/pion/webrtc/v4" ) +const ( + testOldURL = "wss://old" + testOldToken = "old-token" +) + +var errFakeConnect = errors.New("boom") + type fakeRoom struct { mu sync.Mutex state lksdk.ConnectionState @@ -123,14 +130,15 @@ func waitFor(t *testing.T, cond func() bool) { t.Fatal("condition was not met before timeout") } +//nolint:cyclop // reconnect flow test keeps setup and postconditions in one scenario func TestReconnectRefreshesCredentialsAndReplacesRoom(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() refreshes := 0 sess, err := New(ctx, engine.Config{ - URL: "wss://old", - Token: "old-token", + URL: testOldURL, + Token: testOldToken, Refresh: func(context.Context) (engine.Credentials, error) { refreshes++ return engine.Credentials{URL: "wss://new", Token: "new-token"}, nil @@ -139,7 +147,10 @@ func TestReconnectRefreshesCredentialsAndReplacesRoom(t *testing.T) { if err != nil { t.Fatalf("New() error = %v", err) } - s := sess.(*Session) + s, ok := sess.(*Session) + if !ok { + t.Fatalf("New() type = %T, want *Session", sess) + } connector := newFakeConnector() s.connectRoom = connector.connect @@ -163,10 +174,10 @@ func TestReconnectRefreshesCredentialsAndReplacesRoom(t *testing.T) { } urls, tokens := connector.snapshot() - if got, want := urls, []string{"wss://old", "wss://new"}; !equalStrings(got, want) { + if got, want := urls, []string{testOldURL, "wss://new"}; !equalStrings(got, want) { t.Fatalf("connect urls = %v, want %v", got, want) } - if got, want := tokens, []string{"old-token", "new-token"}; !equalStrings(got, want) { + if got, want := tokens, []string{testOldToken, "new-token"}; !equalStrings(got, want) { t.Fatalf("connect tokens = %v, want %v", got, want) } if refreshes != 1 { @@ -188,13 +199,17 @@ func TestReconnectRefreshesCredentialsAndReplacesRoom(t *testing.T) { } } +//nolint:cyclop // terminal disconnect test keeps setup and cleanup assertions together func TestDisconnectedEndsWhenReconnectDisallowed(t *testing.T) { ctx := context.Background() - sess, err := New(ctx, engine.Config{URL: "wss://old", Token: "old-token"}) + sess, err := New(ctx, engine.Config{URL: testOldURL, Token: testOldToken}) if err != nil { t.Fatalf("New() error = %v", err) } - s := sess.(*Session) + s, ok := sess.(*Session) + if !ok { + t.Fatalf("New() type = %T, want *Session", sess) + } connector := newFakeConnector() s.connectRoom = connector.connect s.SetShouldReconnect(func() bool { return false }) @@ -264,7 +279,7 @@ func TestCanSendRequiresConnectedRoomAndQueueHeadroom(t *testing.T) { t.Fatal("CanSend() = false for connected room") } - for i := 0; i < defaultSendQueueCapHard; i++ { + for range defaultSendQueueCapHard { s.sendQueue <- []byte("x") } if s.CanSend() { @@ -277,11 +292,11 @@ func TestReconnectFailureRetriesUntilContextDone(t *testing.T) { defer cancel() s := &Session{ - url: "wss://old", - token: "old-token", + url: testOldURL, + token: testOldToken, connectRoom: func(string, string, *lksdk.RoomCallback) (roomHandle, error) { cancel() - return nil, errors.New("boom") + return nil, errFakeConnect }, reconnectCh: make(chan struct{}, 1), closeCh: make(chan struct{}), diff --git a/internal/link/direct/direct_test.go b/internal/link/direct/direct_test.go index f891e88..cc55bea 100644 --- a/internal/link/direct/direct_test.go +++ b/internal/link/direct/direct_test.go @@ -114,7 +114,11 @@ func TestNewForwardsConfigAndMethods(t *testing.T) { if !ln.CanSend() { t.Fatal("CanSend() = false, want true") } - if features := ln.(link.FeaturesProvider).Features(); features.MaxPayloadSize != 4096 { + provider, ok := ln.(link.FeaturesProvider) + if !ok { + t.Fatalf("New() type = %T, want link.FeaturesProvider", ln) + } + if features := provider.Features(); features.MaxPayloadSize != 4096 { t.Fatalf("Features() = %+v, want shaped max payload 4096", features) } } diff --git a/internal/protect/protect.go b/internal/protect/protect.go index 2919fa3..00b38e3 100644 --- a/internal/protect/protect.go +++ b/internal/protect/protect.go @@ -33,7 +33,7 @@ var ( `[^",\s}]+`, ) sensitiveBearerRE = regexp.MustCompile(`(?i)(bearer\s+)[A-Za-z0-9._~+/=-]+`) -) //nolint:gochecknoglobals // compiled once for provider error redaction +) // Protector is called with a socket file descriptor before connect. // On Android, this calls VpnService.protect(fd) to bypass VPN routing. diff --git a/internal/server/server_test.go b/internal/server/server_test.go index 65a2bc5..05bbbf5 100644 --- a/internal/server/server_test.go +++ b/internal/server/server_test.go @@ -379,6 +379,7 @@ func TestReinstallSessionFiresOnClose(t *testing.T) { } } +//nolint:cyclop // integration-style control loop test needs setup and async assertions together func TestStartControlLoopReportsPong(t *testing.T) { a, b := net.Pipe() defer func() { diff --git a/internal/supervisor/supervisor.go b/internal/supervisor/supervisor.go index 293a4eb..30b8a1f 100644 --- a/internal/supervisor/supervisor.go +++ b/internal/supervisor/supervisor.go @@ -10,7 +10,10 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/app/session" ) +// DefaultRetryDelay is used between profile attempts when Config.RetryDelay is unset. const DefaultRetryDelay = 2 * time.Second + +// DefaultHistoryLimit bounds emitted status history when Config.HistoryLimit is unset. const DefaultHistoryLimit = 20 const ( @@ -25,6 +28,7 @@ var ( ErrNoProfiles = errors.New("supervisor: no profiles configured") // ErrMaxCyclesExceeded is returned after MaxCycles complete profile-list passes. ErrMaxCyclesExceeded = errors.New("supervisor: max failover cycles exceeded") + errProfileCleanEnd = errors.New("profile ended") ) // Profile is one runnable session configuration in an ordered failover list. @@ -91,39 +95,72 @@ func Run(ctx context.Context, cfg Config, run Runner) error { var lastErr error for cycle := 1; ; cycle++ { - for i, profile := range cfg.Profiles { - if ctx.Err() != nil { - return nil - } - state.start(i, cycle) - if cfg.OnProfileStart != nil { - cfg.OnProfileStart(profile, cycle) - } - - err := run(ctx, profile.Config) - if ctx.Err() != nil { - return nil - } - if err != nil { - lastErr = fmt.Errorf("profile %q: %w", profile.Name, err) - } else { - lastErr = fmt.Errorf("profile %q ended", profile.Name) - } - state.end(i, cycle, err) - if cfg.OnProfileEnd != nil { - cfg.OnProfileEnd(profile, cycle, err) - } - - if cfg.MaxCycles > 0 && cycle >= cfg.MaxCycles && i == len(cfg.Profiles)-1 { - return fmt.Errorf("%w after %d cycle(s): %w", ErrMaxCyclesExceeded, cycle, lastErr) - } - if err := waitRetryDelay(ctx, cfg.RetryDelay); err != nil { - return nil - } + if err := runCycle(ctx, cfg, run, state, cycle, &lastErr); err != nil { + return err } } } +func runCycle( + ctx context.Context, + cfg Config, + run Runner, + state *statusTracker, + cycle int, + lastErr *error, +) error { + for i, profile := range cfg.Profiles { + if err := runProfile(ctx, cfg, run, state, cycle, i, profile, lastErr); err != nil { + return err + } + } + return nil +} + +func runProfile( + ctx context.Context, + cfg Config, + run Runner, + state *statusTracker, + cycle int, + profileIndex int, + profile Profile, + lastErr *error, +) error { + if ctx.Err() != nil { + return nil //nolint:nilerr // context cancellation is normal supervisor shutdown + } + state.start(profileIndex, cycle) + if cfg.OnProfileStart != nil { + cfg.OnProfileStart(profile, cycle) + } + + err := run(ctx, profile.Config) + if ctx.Err() != nil { + return nil //nolint:nilerr // context cancellation is normal supervisor shutdown + } + *lastErr = profileResultError(profile.Name, err) + state.end(profileIndex, cycle, err) + if cfg.OnProfileEnd != nil { + cfg.OnProfileEnd(profile, cycle, err) + } + + if cfg.MaxCycles > 0 && cycle >= cfg.MaxCycles && profileIndex == len(cfg.Profiles)-1 { + return fmt.Errorf("%w after %d cycle(s): %w", ErrMaxCyclesExceeded, cycle, *lastErr) + } + if err := waitRetryDelay(ctx, cfg.RetryDelay); err != nil { + return nil //nolint:nilerr // context cancellation during retry delay is normal shutdown + } + return nil +} + +func profileResultError(name string, err error) error { + if err != nil { + return fmt.Errorf("profile %q: %w", name, err) + } + return fmt.Errorf("profile %q: %w", name, errProfileCleanEnd) +} + type statusTracker struct { status Status notify func(Status) @@ -222,7 +259,7 @@ func waitRetryDelay(ctx context.Context, delay time.Duration) error { defer timer.Stop() select { case <-ctx.Done(): - return ctx.Err() + return fmt.Errorf("retry delay canceled: %w", ctx.Err()) case <-timer.C: return nil } diff --git a/internal/supervisor/supervisor_test.go b/internal/supervisor/supervisor_test.go index 253d310..b0b14e9 100644 --- a/internal/supervisor/supervisor_test.go +++ b/internal/supervisor/supervisor_test.go @@ -11,6 +11,12 @@ import ( var errRunnerBoom = errors.New("boom") +const ( + testProfileFirst = "first" + testProfileSecond = "second" + testProfileOne = "one" +) + func TestRunRequiresProfiles(t *testing.T) { err := Run(context.Background(), Config{}, func(context.Context, session.Config) error { return nil }) if !errors.Is(err, ErrNoProfiles) { @@ -20,8 +26,8 @@ func TestRunRequiresProfiles(t *testing.T) { func TestRunAdvancesProfilesAndStopsAtMaxCycles(t *testing.T) { profiles := []Profile{ - {Name: "first", Config: session.Config{Auth: "wbstream"}}, - {Name: "second", Config: session.Config{Auth: "jitsi"}}, + {Name: testProfileFirst, Config: session.Config{Auth: "wbstream"}}, + {Name: testProfileSecond, Config: session.Config{Auth: "jitsi"}}, } var started []string var ended []string @@ -50,18 +56,19 @@ func TestRunAdvancesProfilesAndStopsAtMaxCycles(t *testing.T) { if !errors.Is(err, ErrMaxCyclesExceeded) { t.Fatalf("Run() error = %v, want %v", err, ErrMaxCyclesExceeded) } - if got, want := started, []string{"first", "second"}; !equalStrings(got, want) { + if got, want := started, []string{testProfileFirst, testProfileSecond}; !equalStrings(got, want) { t.Fatalf("started = %v, want %v", got, want) } - if got, want := ended, []string{"first", "second"}; !equalStrings(got, want) { + if got, want := ended, []string{testProfileFirst, testProfileSecond}; !equalStrings(got, want) { t.Fatalf("ended = %v, want %v", got, want) } } +//nolint:cyclop // status history test verifies one complete failover cycle func TestRunEmitsStatusHistory(t *testing.T) { profiles := []Profile{ - {Name: "first", Config: session.Config{Auth: "wbstream"}}, - {Name: "second", Config: session.Config{Auth: "jitsi"}}, + {Name: testProfileFirst, Config: session.Config{Auth: "wbstream"}}, + {Name: testProfileSecond, Config: session.Config{Auth: "jitsi"}}, } var snapshots []Status err := Run(context.Background(), Config{ @@ -73,7 +80,7 @@ func TestRunEmitsStatusHistory(t *testing.T) { snapshots = append(snapshots, status) }, }, func(_ context.Context, cfg session.Config) error { - if cfg.Auth == "first" { + if cfg.Auth == testProfileFirst { t.Fatal("runner received profile name instead of config") } return errRunnerBoom @@ -85,7 +92,7 @@ func TestRunEmitsStatusHistory(t *testing.T) { t.Fatalf("status snapshots = %d, want 4", len(snapshots)) } first := snapshots[0] - if first.ActiveProfile != "first" || first.ActiveProfileIndex != 0 || first.Cycle != 1 { + if first.ActiveProfile != testProfileFirst || first.ActiveProfileIndex != 0 || first.Cycle != 1 { t.Fatalf("first status = %+v", first) } if first.Profiles[0].Starts != 1 || first.Profiles[0].LastStarted.IsZero() { @@ -104,10 +111,10 @@ func TestRunEmitsStatusHistory(t *testing.T) { if len(last.History) != 3 { t.Fatalf("history length = %d, want 3", len(last.History)) } - if last.History[0].Type != EventProfileEnd || last.History[0].Profile != "first" { + if last.History[0].Type != EventProfileEnd || last.History[0].Profile != testProfileFirst { t.Fatalf("oldest bounded history event = %+v", last.History[0]) } - if last.History[2].Type != EventProfileEnd || last.History[2].Profile != "second" || + if last.History[2].Type != EventProfileEnd || last.History[2].Profile != testProfileSecond || last.History[2].Error == "" { t.Fatalf("last history event = %+v", last.History[2]) } @@ -117,7 +124,7 @@ func TestRunStatusSnapshotIsImmutable(t *testing.T) { var first Status var second Status err := Run(context.Background(), Config{ - Profiles: []Profile{{Name: "one"}}, + Profiles: []Profile{{Name: testProfileOne}}, RetryDelay: -1, MaxCycles: 1, OnStatus: func(status Status) { @@ -138,7 +145,7 @@ func TestRunStatusSnapshotIsImmutable(t *testing.T) { if first.Profiles[0].Starts != 99 || first.History[0].Profile != "mutated" { t.Fatalf("test mutation did not apply to snapshot: %+v", first) } - if second.Profiles[0].Starts != 1 || second.History[0].Profile != "one" { + if second.Profiles[0].Starts != 1 || second.History[0].Profile != testProfileOne { t.Fatalf("snapshot mutation leaked into later status: %+v", second) } } @@ -146,7 +153,7 @@ func TestRunStatusSnapshotIsImmutable(t *testing.T) { func TestRunReturnsNilOnContextCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) err := Run(ctx, Config{ - Profiles: []Profile{{Name: "one"}}, + Profiles: []Profile{{Name: testProfileOne}}, RetryDelay: time.Hour, }, func(context.Context, session.Config) error { cancel() diff --git a/internal/transport/traffic.go b/internal/transport/traffic.go index 31f194b..9ef0f73 100644 --- a/internal/transport/traffic.go +++ b/internal/transport/traffic.go @@ -9,8 +9,15 @@ import ( "time" ) +// ErrTrafficPayloadTooLarge is returned when Send receives a payload above the configured cap. var ErrTrafficPayloadTooLarge = errors.New("traffic payload exceeds max_payload_size") +var ( + errTrafficConnect = errors.New("traffic connect failed") + errTrafficSend = errors.New("traffic send failed") + errTrafficClose = errors.New("traffic close failed") +) + type trafficTransport struct { inner Transport maxPayloadSize int @@ -43,7 +50,12 @@ func effectiveTrafficConfig(features Features, cfg TrafficConfig) TrafficConfig return cfg } -func (t *trafficTransport) Connect(ctx context.Context) error { return t.inner.Connect(ctx) } +func (t *trafficTransport) Connect(ctx context.Context) error { + if err := t.inner.Connect(ctx); err != nil { + return fmt.Errorf("%w: %w", errTrafficConnect, err) + } + return nil +} func (t *trafficTransport) Send(data []byte) error { t.sendMu.Lock() @@ -54,10 +66,18 @@ func (t *trafficTransport) Send(data []byte) error { if delay := t.nextDelay(); delay > 0 { time.Sleep(delay) } - return t.inner.Send(data) + if err := t.inner.Send(data); err != nil { + return fmt.Errorf("%w: %w", errTrafficSend, err) + } + return nil } -func (t *trafficTransport) Close() error { return t.inner.Close() } +func (t *trafficTransport) Close() error { + if err := t.inner.Close(); err != nil { + return fmt.Errorf("%w: %w", errTrafficClose, err) + } + return nil +} func (t *trafficTransport) SetReconnectCallback(cb func()) { t.inner.SetReconnectCallback(cb) } diff --git a/mobile/mobile_test.go b/mobile/mobile_test.go index 2498103..812c537 100644 --- a/mobile/mobile_test.go +++ b/mobile/mobile_test.go @@ -77,6 +77,7 @@ func TestProtectorAndLogging(t *testing.T) { } } +//nolint:cyclop // compact setter smoke test verifies several related defaults together func TestDefaultsAndSetters(t *testing.T) { resetMobileGlobals(t) @@ -182,7 +183,8 @@ func TestStartWithInjectedRunnerLifecycle(t *testing.T) { cfg.Liveness.Timeout != 750*time.Millisecond || cfg.Liveness.Failures != 4 { t.Fatalf( - "RunWithReady args mismatch: link=%q transport=%q carrier=%q room=%q client=%q local=%q dns=%q vp8=%d/%d liveness=%+v", + "RunWithReady args mismatch: link=%q transport=%q carrier=%q room=%q client=%q "+ + "local=%q dns=%q vp8=%d/%d liveness=%+v", cfg.Link, cfg.Transport, cfg.Carrier, cfg.RoomURL, cfg.DeviceID, cfg.LocalAddr, cfg.DNSServer, cfg.VP8FPS, cfg.VP8BatchSize, cfg.Liveness, )