diff --git a/cmd/olcrtc/main.go b/cmd/olcrtc/main.go index 7fa128c..f49dc1c 100644 --- a/cmd/olcrtc/main.go +++ b/cmd/olcrtc/main.go @@ -6,14 +6,15 @@ import ( "errors" "flag" "fmt" + "io" "os" "os/signal" "path/filepath" "syscall" "time" - lksdk "github.com/livekit/server-sdk-go/v2" protoLogger "github.com/livekit/protocol/logger" + lksdk "github.com/livekit/server-sdk-go/v2" "github.com/openlibrecommunity/olcrtc/internal/app/session" "github.com/openlibrecommunity/olcrtc/internal/logger" "github.com/openlibrecommunity/olcrtc/internal/names" @@ -22,27 +23,30 @@ import ( // ErrDataDirRequired is returned when no data directory is specified. var ErrDataDirRequired = errors.New("data directory required (use -data data)") +//nolint:gochecknoglobals // Tests replace the long-running session runner with a bounded function. +var runSession = session.Run + type config struct { - mode string - link string - transport string - carrier string - roomID string - clientID string - provider string - socksPort int - socksHost string - keyHex string - debug bool - dataDir string - dnsServer string - socksProxyAddr string - socksProxyPort int - videoWidth int - videoHeight int - videoFPS int - videoBitrate string - videoHW string + mode string + link string + transport string + carrier string + roomID string + clientID string + provider string + socksPort int + socksHost string + keyHex string + debug bool + dataDir string + dnsServer string + socksProxyAddr string + socksProxyPort int + videoWidth int + videoHeight int + videoFPS int + videoBitrate string + videoHW string videoQRSize int videoQRRecovery string videoCodec string @@ -60,9 +64,20 @@ func main() { } func run() error { + return runWithArgs(os.Args[1:]) +} + +func runWithArgs(args []string) error { session.RegisterDefaults() - cfg := parseFlags() + cfg, err := parseFlagsFrom(args, flag.ExitOnError) + if err != nil { + return err + } + return runWithConfig(cfg) +} + +func runWithConfig(cfg config) error { configureLogging(cfg.debug) if err := session.Validate(toSessionConfig(cfg)); err != nil { @@ -90,7 +105,7 @@ func run() error { errCh := make(chan error, 1) go func() { - errCh <- session.Run(ctx, toSessionConfig(cfg)) + errCh <- runSession(ctx, toSessionConfig(cfg)) }() select { @@ -104,43 +119,51 @@ func run() error { } func parseFlags() config { - cfg := config{} - - flag.StringVar(&cfg.mode, "mode", "", "Mode: srv or cnc") - flag.StringVar(&cfg.link, "link", "", "Link: direct (p2p connection type)") - flag.StringVar(&cfg.transport, "transport", "", "Transport: datachannel, videochannel, seichannel") - flag.StringVar(&cfg.carrier, "carrier", "", "Carrier: telemost, jazz, wbstream") - flag.StringVar(&cfg.roomID, "id", "", "Room ID") - flag.StringVar(&cfg.clientID, "client-id", "", "Client ID: binds one srv to one cnc (required)") - flag.StringVar(&cfg.provider, "provider", "", "Deprecated alias for -carrier") - flag.IntVar(&cfg.socksPort, "socks-port", 0, "SOCKS5 port (client only)") - flag.StringVar(&cfg.socksHost, "socks-host", "", "SOCKS5 listen host (client only)") - flag.StringVar(&cfg.keyHex, "key", "", "Shared encryption key (hex)") - flag.BoolVar(&cfg.debug, "debug", false, "Enable verbose logging") - flag.StringVar(&cfg.dataDir, "data", "", "Path to data directory") - flag.StringVar(&cfg.dnsServer, "dns", "", "DNS server (e.g. 1.1.1.1:53)") - flag.StringVar(&cfg.socksProxyAddr, "socks-proxy", "", "SOCKS5 proxy address (server only)") - flag.IntVar(&cfg.socksProxyPort, "socks-proxy-port", 0, "SOCKS5 proxy port (server only)") - flag.IntVar(&cfg.videoWidth, "video-w", 0, "Video logical width (videochannel only)") - flag.IntVar(&cfg.videoHeight, "video-h", 0, "Video logical height (videochannel only)") - flag.IntVar(&cfg.videoFPS, "video-fps", 0, "Video frames per second (videochannel only)") - flag.StringVar(&cfg.videoBitrate, "video-bitrate", "", "Video bitrate (videochannel only)") - flag.StringVar(&cfg.videoHW, "video-hw", "", "Hardware acceleration (none, nvenc)") - flag.IntVar(&cfg.videoQRSize, "video-qr-size", 0, "Video QR code fragment size (videochannel only)") - flag.StringVar(&cfg.videoQRRecovery, "video-qr-recovery", "low", - "QR error correction: low (7%), medium (15%), high (25%), highest (30%)") - flag.StringVar(&cfg.videoCodec, "video-codec", "qrcode", "Visual codec: qrcode or tile") - flag.IntVar(&cfg.videoTileModule, "video-tile-module", 0, - "Tile module size in pixels 1..270 (videochannel tile only, default 4)") - flag.IntVar(&cfg.videoTileRS, "video-tile-rs", 0, - "Tile Reed-Solomon parity percent 0..200 (videochannel tile only, default 20)") - flag.IntVar(&cfg.vp8FPS, "vp8-fps", 0, "VP8 frames per second (vp8channel only, default 25)") - flag.IntVar(&cfg.vp8BatchSize, "vp8-batch", 0, "VP8 frames per tick (vp8channel only, default 1)") - flag.Parse() - + cfg, _ := parseFlagsFrom(os.Args[1:], flag.ExitOnError) return cfg } +func parseFlagsFrom(args []string, errorHandling flag.ErrorHandling) (config, error) { + cfg := config{} + fs := flag.NewFlagSet("olcrtc", errorHandling) + if errorHandling == flag.ContinueOnError { + fs.SetOutput(io.Discard) + } + + fs.StringVar(&cfg.mode, "mode", "", "Mode: srv or cnc") + fs.StringVar(&cfg.link, "link", "", "Link: direct (p2p connection type)") + fs.StringVar(&cfg.transport, "transport", "", "Transport: datachannel, videochannel, seichannel") + fs.StringVar(&cfg.carrier, "carrier", "", "Carrier: telemost, jazz, wbstream") + fs.StringVar(&cfg.roomID, "id", "", "Room ID") + fs.StringVar(&cfg.clientID, "client-id", "", "Client ID: binds one srv to one cnc (required)") + fs.StringVar(&cfg.provider, "provider", "", "Deprecated alias for -carrier") + fs.IntVar(&cfg.socksPort, "socks-port", 0, "SOCKS5 port (client only)") + fs.StringVar(&cfg.socksHost, "socks-host", "", "SOCKS5 listen host (client only)") + fs.StringVar(&cfg.keyHex, "key", "", "Shared encryption key (hex)") + fs.BoolVar(&cfg.debug, "debug", false, "Enable verbose logging") + fs.StringVar(&cfg.dataDir, "data", "", "Path to data directory") + fs.StringVar(&cfg.dnsServer, "dns", "", "DNS server (e.g. 1.1.1.1:53)") + fs.StringVar(&cfg.socksProxyAddr, "socks-proxy", "", "SOCKS5 proxy address (server only)") + fs.IntVar(&cfg.socksProxyPort, "socks-proxy-port", 0, "SOCKS5 proxy port (server only)") + fs.IntVar(&cfg.videoWidth, "video-w", 0, "Video logical width (videochannel only)") + fs.IntVar(&cfg.videoHeight, "video-h", 0, "Video logical height (videochannel only)") + fs.IntVar(&cfg.videoFPS, "video-fps", 0, "Video frames per second (videochannel only)") + fs.StringVar(&cfg.videoBitrate, "video-bitrate", "", "Video bitrate (videochannel only)") + fs.StringVar(&cfg.videoHW, "video-hw", "", "Hardware acceleration (none, nvenc)") + fs.IntVar(&cfg.videoQRSize, "video-qr-size", 0, "Video QR code fragment size (videochannel only)") + fs.StringVar(&cfg.videoQRRecovery, "video-qr-recovery", "low", + "QR error correction: low (7%), medium (15%), high (25%), highest (30%)") + fs.StringVar(&cfg.videoCodec, "video-codec", "qrcode", "Visual codec: qrcode or tile") + fs.IntVar(&cfg.videoTileModule, "video-tile-module", 0, + "Tile module size in pixels 1..270 (videochannel tile only, default 4)") + fs.IntVar(&cfg.videoTileRS, "video-tile-rs", 0, + "Tile Reed-Solomon parity percent 0..200 (videochannel tile only, default 20)") + fs.IntVar(&cfg.vp8FPS, "vp8-fps", 0, "VP8 frames per second (vp8channel only, default 25)") + fs.IntVar(&cfg.vp8BatchSize, "vp8-batch", 0, "VP8 frames per tick (vp8channel only, default 1)") + + return cfg, fs.Parse(args) +} + func configureLogging(debug bool) { if debug { logger.SetVerbose(true) diff --git a/cmd/olcrtc/main_test.go b/cmd/olcrtc/main_test.go index 8467992..26f4258 100644 --- a/cmd/olcrtc/main_test.go +++ b/cmd/olcrtc/main_test.go @@ -1,7 +1,9 @@ package main import ( + "context" "errors" + "flag" "os" "path/filepath" "testing" @@ -58,6 +60,117 @@ func TestToSessionConfigAndFirstNonEmpty(t *testing.T) { } } +func TestParseFlagsFrom(t *testing.T) { + cfg, err := parseFlagsFrom([]string{ + "-mode", "srv", + "-link", "direct", + "-transport", "vp8channel", + "-carrier", "telemost", + "-id", "room", + "-client-id", "client", + "-socks-port", "1080", + "-socks-host", "127.0.0.1", + "-key", "key", + "-debug", + "-data", "data", + "-dns", "9.9.9.9:53", + "-socks-proxy", "proxy", + "-socks-proxy-port", "1081", + "-video-w", "640", + "-video-h", "480", + "-video-fps", "30", + "-video-bitrate", "1M", + "-video-hw", "none", + "-video-qr-size", "128", + "-video-qr-recovery", "high", + "-video-codec", "tile", + "-video-tile-module", "6", + "-video-tile-rs", "40", + "-vp8-fps", "24", + "-vp8-batch", "3", + }, flag.ContinueOnError) + if err != nil { + t.Fatalf("parseFlagsFrom() error = %v", err) + } + if cfg.mode != "srv" || cfg.carrier != "telemost" || cfg.roomID != "room" || + cfg.debug != true || cfg.videoCodec != "tile" || cfg.videoTileRS != 40 || + cfg.vp8FPS != 24 || cfg.vp8BatchSize != 3 { + t.Fatalf("parseFlagsFrom() = %+v", cfg) + } + + _, err = parseFlagsFrom([]string{"-bad"}, flag.ContinueOnError) + if err == nil { + t.Fatal("parseFlagsFrom(bad flag) error = nil") + } +} + +func TestRunWithConfigValidationAndDataDirErrors(t *testing.T) { + session.RegisterDefaults() + cfg := config{ + mode: "srv", + link: "direct", + transport: "datachannel", + carrier: "jazz", + clientID: "client", + keyHex: "key", + dnsServer: "1.1.1.1:53", + videoCodec: "qrcode", + } + if err := runWithConfig(cfg); !errors.Is(err, ErrDataDirRequired) { + t.Fatalf("runWithConfig(no data dir) = %v, want %v", err, ErrDataDirRequired) + } + + cfg.mode = "" + if err := runWithConfig(cfg); err == nil { + t.Fatal("runWithConfig(invalid config) error = nil") + } +} + +func TestRunWithArgsSuccessfulSessionReturn(t *testing.T) { + dir := t.TempDir() + if err := os.WriteFile(filepath.Join(dir, "names"), []byte("A\n"), 0o600); err != nil { + t.Fatalf("WriteFile(names) error = %v", err) + } + if err := os.WriteFile(filepath.Join(dir, "surnames"), []byte("B\n"), 0o600); err != nil { + t.Fatalf("WriteFile(surnames) error = %v", err) + } + + oldRunSession := runSession + t.Cleanup(func() { + runSession = oldRunSession + }) + called := false + runSession = func(ctx context.Context, cfg session.Config) error { + called = true + if cfg.Mode != "srv" || cfg.Carrier != "jazz" || cfg.ClientID != "client" { + t.Fatalf("session config = %+v", cfg) + } + select { + case <-ctx.Done(): + t.Fatal("context canceled before session returned") + default: + } + return nil + } + + err := runWithArgs([]string{ + "-mode", "srv", + "-link", "direct", + "-transport", "datachannel", + "-carrier", "jazz", + "-client-id", "client", + "-key", "key", + "-dns", "1.1.1.1:53", + "-data", dir, + }) + if err != nil { + t.Fatalf("runWithArgs() error = %v", err) + } + if !called { + t.Fatal("runWithArgs() did not call session runner") + } +} + func TestConfigureLogging(t *testing.T) { logger.SetVerbose(false) configureLogging(true) diff --git a/internal/transport/seichannel/transport_unit_test.go b/internal/transport/seichannel/transport_unit_test.go new file mode 100644 index 0000000..5f4c1f1 --- /dev/null +++ b/internal/transport/seichannel/transport_unit_test.go @@ -0,0 +1,159 @@ +package seichannel + +import ( + "context" + "errors" + "hash/crc32" + "testing" + "time" + + "github.com/openlibrecommunity/olcrtc/internal/carrier" + "github.com/openlibrecommunity/olcrtc/internal/transport" + "github.com/pion/webrtc/v4" +) + +type fakeVideoSession struct { + stream *fakeVideoStream + err error +} + +func (s *fakeVideoSession) Capabilities() carrier.Capabilities { + return carrier.Capabilities{VideoTrack: true} +} +func (s *fakeVideoSession) OpenVideoTrack() (carrier.VideoTrack, error) { + if s.err != nil { + return nil, s.err + } + return s.stream, nil +} + +type fakeVideoStream struct { + connectErr error + closeErr error + canSend bool + + trackAdded bool + trackCB func(*webrtc.TrackRemote, *webrtc.RTPReceiver) + reconnect func() + should func() bool + ended func(string) + watched bool + closed bool +} + +func (s *fakeVideoStream) Connect(context.Context) error { return s.connectErr } +func (s *fakeVideoStream) Close() error { + s.closed = true + return s.closeErr +} +func (s *fakeVideoStream) SetReconnectCallback(cb func()) { s.reconnect = cb } +func (s *fakeVideoStream) SetShouldReconnect(fn func() bool) { s.should = fn } +func (s *fakeVideoStream) SetEndedCallback(cb func(string)) { s.ended = cb } +func (s *fakeVideoStream) WatchConnection(context.Context) { s.watched = true } +func (s *fakeVideoStream) CanSend() bool { return s.canSend } +func (s *fakeVideoStream) AddTrack(webrtc.TrackLocal) error { s.trackAdded = true; return nil } +func (s *fakeVideoStream) SetTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) { + s.trackCB = cb +} + +type nonVideoSession struct{} + +func (s *nonVideoSession) Capabilities() carrier.Capabilities { return carrier.Capabilities{} } + +func TestNewConnectCallbacksAndFeatures(t *testing.T) { + stream := &fakeVideoStream{canSend: true} + name := "seichannel-unit-new" + carrier.Register(name, func(context.Context, carrier.Config) (carrier.Session, error) { + return &fakeVideoSession{stream: stream}, nil + }) + + trIface, err := New(context.Background(), transport.Config{Carrier: name}) + if err != nil { + t.Fatalf("New() error = %v", err) + } + tr := trIface.(*streamTransport) + if !stream.trackAdded || stream.trackCB == nil { + t.Fatal("New() did not attach track and handler") + } + if err := tr.Connect(context.Background()); err != nil { + t.Fatalf("Connect() error = %v", err) + } + if !tr.writerUp.Load() { + t.Fatal("Connect() did not start writer") + } + tr.SetReconnectCallback(func() {}) + tr.SetShouldReconnect(func() bool { return true }) + tr.SetEndedCallback(func(string) {}) + tr.WatchConnection(context.Background()) + if stream.reconnect == nil || stream.should == nil || stream.ended == nil || !stream.watched { + t.Fatal("callbacks/watch were not forwarded") + } + if !tr.CanSend() { + t.Fatal("CanSend() = false, want true") + } + if features := tr.Features(); !features.Reliable || !features.Ordered || !features.MessageOriented || features.MaxPayloadSize == 0 { + t.Fatalf("Features() = %+v", features) + } + if err := tr.Close(); err != nil { + t.Fatalf("Close() error = %v", err) + } +} + +func TestNewErrorPaths(t *testing.T) { + carrier.Register("seichannel-create-fails", func(context.Context, carrier.Config) (carrier.Session, error) { + return nil, errors.New("boom") + }) + if _, err := New(context.Background(), transport.Config{Carrier: "seichannel-create-fails"}); err == nil || err.Error() != "create provider transport: boom" { + t.Fatalf("New() error = %v", err) + } + + carrier.Register("seichannel-no-video", func(context.Context, carrier.Config) (carrier.Session, error) { + return &nonVideoSession{}, nil + }) + if _, err := New(context.Background(), transport.Config{Carrier: "seichannel-no-video"}); !errors.Is(err, ErrVideoTrackUnsupported) { + t.Fatalf("New() error = %v, want %v", err, ErrVideoTrackUnsupported) + } + + carrier.Register("seichannel-open-fails", func(context.Context, carrier.Config) (carrier.Session, error) { + return &fakeVideoSession{err: errors.New("open boom")}, nil + }) + if _, err := New(context.Background(), transport.Config{Carrier: "seichannel-open-fails"}); err == nil || err.Error() != "open video track: open boom" { + t.Fatalf("New() error = %v", err) + } +} + +func TestSendAckAndClosePaths(t *testing.T) { + tr := &streamTransport{ + stream: &fakeVideoStream{canSend: true}, + outbound: make(chan []byte, 8), + outboundAck: make(chan []byte, 8), + closeCh: make(chan struct{}), + writerDone: make(chan struct{}), + ackWaiters: make(map[uint32]chan uint32), + } + + done := make(chan error, 1) + payload := []byte("payload") + go func() { done <- tr.Send(payload) }() + + select { + case frame := <-tr.outbound: + decoded, err := decodeTransportFrame(frame) + if err != nil { + t.Fatalf("decodeTransportFrame() error = %v", err) + } + tr.resolveAck(decoded.seq, crc32.ChecksumIEEE(payload)) + case <-time.After(time.Second): + t.Fatal("Send() did not enqueue frame") + } + + if err := <-done; err != nil { + t.Fatalf("Send() error = %v", err) + } + if err := tr.Close(); err != nil { + t.Fatalf("Close() error = %v", err) + } + if err := tr.Send([]byte("closed")); !errors.Is(err, ErrTransportClosed) { + t.Fatalf("Send(closed) error = %v, want %v", err, ErrTransportClosed) + } +} diff --git a/internal/transport/videochannel/frame_extra_test.go b/internal/transport/videochannel/frame_extra_test.go index d782d94..e55b960 100644 --- a/internal/transport/videochannel/frame_extra_test.go +++ b/internal/transport/videochannel/frame_extra_test.go @@ -76,6 +76,12 @@ func TestCodecSpecsAndArgs(t *testing.T) { if got := resolveEncoderCodec(vp8CodecSpec(), "none"); got != "libvpx" { t.Fatalf("resolveEncoderCodec(vp8,none) = %q", got) } + if got := resolveEncoderCodec(vp9CodecSpec(), "nvenc"); got != "vp9_nvenc" { + t.Fatalf("resolveEncoderCodec(vp9,nvenc) = %q", got) + } + if got := resolveEncoderCodec(codecSpec{mimeType: webrtc.MimeTypeAV1, encoder: "libaom-av1"}, "nvenc"); got != "av1_nvenc" { + t.Fatalf("resolveEncoderCodec(av1,nvenc) = %q", got) + } args := buildEncoderArgs(vp8CodecSpec(), "vp8_nvenc", 320, 240, 30, "1M") for _, want := range []string{"-video_size", "320x240", "-framerate", "30", "vp8_nvenc", "-b:v", "1M", "ivf"} { @@ -87,6 +93,29 @@ func TestCodecSpecsAndArgs(t *testing.T) { if h264Args[len(h264Args)-2] != "h264" { t.Fatalf("h264 encoder args = %v", h264Args) } + + if got := resolveDecoderName(h264CodecSpec(), "nvenc"); got != "h264_cuvid" { + t.Fatalf("resolveDecoderName(h264,nvenc) = %q", got) + } + if got := resolveDecoderName(vp8CodecSpec(), "nvenc"); got != "vp8_cuvid" { + t.Fatalf("resolveDecoderName(vp8,nvenc) = %q", got) + } + if got := resolveDecoderName(vp9CodecSpec(), "nvenc"); got != "vp9_cuvid" { + t.Fatalf("resolveDecoderName(vp9,nvenc) = %q", got) + } + if got := resolveDecoderName(codecSpec{mimeType: "video/custom"}, "none"); got != "custom" { + t.Fatalf("resolveDecoderName(custom,none) = %q", got) + } + decArgs := buildDecoderArgs(vp8CodecSpec(), "vp8", 320, 240, "gray") + for _, want := range []string{"-f", "ivf", "-vcodec", "vp8", "scale=320:240:flags=neighbor,format=gray", "rawvideo"} { + if !slices.Contains(decArgs, want) { + t.Fatalf("buildDecoderArgs(vp8) = %v, missing %q", decArgs, want) + } + } + h264DecArgs := buildDecoderArgs(h264CodecSpec(), "h264", 320, 240, "gray") + if h264DecArgs[5] != "h264" { + t.Fatalf("buildDecoderArgs(h264) = %v", h264DecArgs) + } } type shortWriter struct { @@ -105,6 +134,12 @@ type errWriter struct{} func (w errWriter) Write([]byte) (int, error) { return 0, io.ErrClosedPipe } +type bufferWriteCloser struct { + bytes.Buffer +} + +func (w *bufferWriteCloser) Close() error { return nil } + func TestIVFWritersAndWithStderr(t *testing.T) { var buf bytes.Buffer if err := writeIVFHeader(&buf, "VP80", 320, 240, 30); err != nil { @@ -137,3 +172,83 @@ func TestIVFWritersAndWithStderr(t *testing.T) { t.Fatalf("withStderr(nil) = %v", got) } } + +func TestFFmpegProcessErrAndFrameValidation(t *testing.T) { + enc := &ffmpegEncoder{ + stderr: bytes.NewBufferString("encoder failed"), + frames: make(chan []byte, 1), + frameSize: 4, + } + if _, err := enc.EncodeFrame([]byte("bad")); !errors.Is(err, ErrUnexpectedFrameSize) { + t.Fatalf("EncodeFrame(short) error = %v, want %v", err, ErrUnexpectedFrameSize) + } + enc.setErr(errors.New("boom")) + if _, err := enc.EncodeFrame([]byte("good")); err == nil || !strings.Contains(err.Error(), "encoder failed") { + t.Fatalf("EncodeFrame(processErr) error = %v", err) + } + + dec := &ffmpegDecoder{stderr: bytes.NewBufferString("decoder failed")} + dec.setErr(errors.New("boom")) + if err := dec.PushSample([]byte("sample")); err == nil || !strings.Contains(err.Error(), "decoder failed") { + t.Fatalf("PushSample(processErr) error = %v", err) + } + closed := &ffmpegDecoder{} + closed.closed.Store(true) + if err := closed.processErr(); !errors.Is(err, ErrTransportClosed) { + t.Fatalf("decoder processErr(closed) = %v, want %v", err, ErrTransportClosed) + } +} + +func TestFFmpegReadersAndSampleWriters(t *testing.T) { + var ivf bytes.Buffer + if err := writeIVFHeader(&ivf, "VP80", 2, 2, 30); err != nil { + t.Fatalf("writeIVFHeader() error = %v", err) + } + if err := writeIVFFrame(&ivf, 1, []byte("frame")); err != nil { + t.Fatalf("writeIVFFrame() error = %v", err) + } + enc := &ffmpegEncoder{stderr: &bytes.Buffer{}, frames: make(chan []byte, 2)} + enc.readIVF(&ivf) + if got := <-enc.frames; !bytes.Equal(got, []byte("frame")) { + t.Fatalf("readIVF frame = %q", got) + } + + enc = &ffmpegEncoder{stderr: &bytes.Buffer{}, frames: make(chan []byte, 2)} + enc.readRawH264(bytes.NewBufferString("h264")) + if got := <-enc.frames; !bytes.Equal(got, []byte("h264")) { + t.Fatalf("readRawH264 frame = %q", got) + } + + dec := &ffmpegDecoder{stderr: &bytes.Buffer{}, frames: make(chan []byte, 2), frameSize: 4} + dec.readRawFrames(bytes.NewBufferString("aaaabbbb")) + if got := <-dec.frames; !bytes.Equal(got, []byte("aaaa")) { + t.Fatalf("readRawFrames first = %q", got) + } + if got := <-dec.frames; !bytes.Equal(got, []byte("bbbb")) { + t.Fatalf("readRawFrames second = %q", got) + } + + h264In := &bufferWriteCloser{} + dec = &ffmpegDecoder{stdin: h264In, mimeType: webrtc.MimeTypeH264} + if err := dec.PushSample([]byte("sample")); err != nil { + t.Fatalf("PushSample(h264) error = %v", err) + } + if h264In.String() != "sample" { + t.Fatalf("h264 stdin = %q", h264In.String()) + } + + ivfIn := &bufferWriteCloser{} + dec = &ffmpegDecoder{stdin: ivfIn, mimeType: webrtc.MimeTypeVP8} + if err := dec.PushSample([]byte("vp8")); err != nil { + t.Fatalf("PushSample(vp8) error = %v", err) + } + if ivfIn.Len() != 12+len("vp8") || dec.pts != 1 { + t.Fatalf("ivf stdin len=%d pts=%d", ivfIn.Len(), dec.pts) + } + + dec = &ffmpegDecoder{frames: make(chan []byte, 1)} + dec.frames <- []byte("ready") + if got, err := dec.PopFrame(); err != nil || !bytes.Equal(got, []byte("ready")) { + t.Fatalf("PopFrame() = %q, %v", got, err) + } +} diff --git a/internal/transport/videochannel/transport_unit_test.go b/internal/transport/videochannel/transport_unit_test.go new file mode 100644 index 0000000..b729b05 --- /dev/null +++ b/internal/transport/videochannel/transport_unit_test.go @@ -0,0 +1,226 @@ +package videochannel + +import ( + "context" + "errors" + "hash/crc32" + "testing" + "time" + + "github.com/openlibrecommunity/olcrtc/internal/carrier" + "github.com/openlibrecommunity/olcrtc/internal/transport" + "github.com/pion/webrtc/v4" +) + +type fakeVideoSession struct { + stream *fakeVideoStream + err error +} + +func (s *fakeVideoSession) Capabilities() carrier.Capabilities { + return carrier.Capabilities{VideoTrack: true} +} +func (s *fakeVideoSession) OpenVideoTrack() (carrier.VideoTrack, error) { + if s.err != nil { + return nil, s.err + } + return s.stream, nil +} + +type fakeVideoStream struct { + closeErr error + canSend bool + trackAdded bool + trackCB func(*webrtc.TrackRemote, *webrtc.RTPReceiver) + reconnect func() + should func() bool + ended func(string) + watched bool + closed bool +} + +func (s *fakeVideoStream) Connect(context.Context) error { return nil } +func (s *fakeVideoStream) Close() error { s.closed = true; return s.closeErr } +func (s *fakeVideoStream) SetReconnectCallback(cb func()) { s.reconnect = cb } +func (s *fakeVideoStream) SetShouldReconnect(fn func() bool) { s.should = fn } +func (s *fakeVideoStream) SetEndedCallback(cb func(string)) { s.ended = cb } +func (s *fakeVideoStream) WatchConnection(context.Context) { s.watched = true } +func (s *fakeVideoStream) CanSend() bool { return s.canSend } +func (s *fakeVideoStream) AddTrack(webrtc.TrackLocal) error { s.trackAdded = true; return nil } +func (s *fakeVideoStream) SetTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) { + s.trackCB = cb +} + +type nonVideoSession struct{} + +func (s *nonVideoSession) Capabilities() carrier.Capabilities { return carrier.Capabilities{} } + +func TestNewCallbacksFeaturesAndClose(t *testing.T) { + stream := &fakeVideoStream{canSend: true} + name := "videochannel-unit-new" + carrier.Register(name, func(context.Context, carrier.Config) (carrier.Session, error) { + return &fakeVideoSession{stream: stream}, nil + }) + + trIface, err := New(context.Background(), transport.Config{ + Carrier: name, + VideoWidth: 320, + VideoHeight: 240, + VideoFPS: 30, + VideoBitrate: "1M", + VideoCodec: "qrcode", + VideoTileModule: -1, + VideoTileRS: -1, + }) + if err != nil { + t.Fatalf("New() error = %v", err) + } + tr := trIface.(*streamTransport) + if !stream.trackAdded || stream.trackCB == nil { + t.Fatal("New() did not attach track and handler") + } + tr.SetReconnectCallback(func() {}) + tr.SetShouldReconnect(func() bool { return true }) + tr.SetEndedCallback(func(string) {}) + tr.WatchConnection(context.Background()) + if stream.reconnect == nil || stream.should == nil || stream.ended == nil || !stream.watched { + t.Fatal("callbacks/watch were not forwarded") + } + if !tr.CanSend() { + t.Fatal("CanSend() = false, want true") + } + if features := tr.Features(); !features.Reliable || !features.Ordered || !features.MessageOriented || features.MaxPayloadSize == 0 { + t.Fatalf("Features() = %+v", features) + } + if tr.videoQRSize != defaultFragmentSize || tr.videoTileModule != 4 || tr.videoTileRS != 20 { + t.Fatalf("defaults qr=%d tileModule=%d tileRS=%d", tr.videoQRSize, tr.videoTileModule, tr.videoTileRS) + } + if err := tr.Close(); err != nil { + t.Fatalf("Close() error = %v", err) + } +} + +func TestNewErrorPaths(t *testing.T) { + carrier.Register("videochannel-create-fails", func(context.Context, carrier.Config) (carrier.Session, error) { + return nil, errors.New("boom") + }) + if _, err := New(context.Background(), transport.Config{Carrier: "videochannel-create-fails"}); err == nil || err.Error() != "create provider transport: boom" { + t.Fatalf("New() error = %v", err) + } + + carrier.Register("videochannel-no-video", func(context.Context, carrier.Config) (carrier.Session, error) { + return &nonVideoSession{}, nil + }) + if _, err := New(context.Background(), transport.Config{Carrier: "videochannel-no-video"}); !errors.Is(err, ErrVideoTrackUnsupported) { + t.Fatalf("New() error = %v, want %v", err, ErrVideoTrackUnsupported) + } + + carrier.Register("videochannel-open-fails", func(context.Context, carrier.Config) (carrier.Session, error) { + return &fakeVideoSession{err: errors.New("open boom")}, nil + }) + if _, err := New(context.Background(), transport.Config{Carrier: "videochannel-open-fails"}); err == nil || err.Error() != "open video track: open boom" { + t.Fatalf("New() error = %v", err) + } +} + +func TestSendAckAndClosePaths(t *testing.T) { + tr := &streamTransport{ + stream: &fakeVideoStream{canSend: true}, + outbound: make(chan []byte, 8), + outboundAck: make(chan []byte, 8), + closeCh: make(chan struct{}), + writerDone: make(chan struct{}), + ackWaiters: make(map[uint32]chan uint32), + videoQRSize: 4, + } + + done := make(chan error, 1) + payload := []byte("payload") + go func() { done <- tr.Send(payload) }() + + select { + case frame := <-tr.outbound: + decoded, err := decodeTransportFrame(frame) + if err != nil { + t.Fatalf("decodeTransportFrame() error = %v", err) + } + tr.resolveAck(decoded.seq, crc32.ChecksumIEEE(payload)) + case <-time.After(time.Second): + t.Fatal("Send() did not enqueue frame") + } + + if err := <-done; err != nil { + t.Fatalf("Send() error = %v", err) + } + if err := tr.Close(); err != nil { + t.Fatalf("Close() error = %v", err) + } + if err := tr.Send([]byte("closed")); !errors.Is(err, ErrTransportClosed) { + t.Fatalf("Send(closed) error = %v, want %v", err, ErrTransportClosed) + } +} + +func TestOutboundPriorityRenderAndClosedEnqueue(t *testing.T) { + tr := &streamTransport{ + stream: &fakeVideoStream{canSend: true}, + outbound: make(chan []byte, 2), + outboundAck: make(chan []byte, 2), + closeCh: make(chan struct{}), + writerDone: make(chan struct{}), + videoW: 16, + videoH: 16, + videoQRRecovery: "highest", + videoCodec: "qrcode", + videoTileModule: 4, + videoTileRS: 20, + } + + if err := tr.enqueueFrame([]byte("data"), false); err != nil { + t.Fatalf("enqueueFrame(data) error = %v", err) + } + if err := tr.enqueueFrame([]byte("ack"), true); err != nil { + t.Fatalf("enqueueFrame(ack) error = %v", err) + } + if got, ok := tr.nextOutboundFrame(); !ok || string(got) != "ack" { + t.Fatalf("first nextOutboundFrame() = %q/%v, want ack/true", got, ok) + } + if got, ok := tr.nextOutboundFrame(); !ok || string(got) != "data" { + t.Fatalf("second nextOutboundFrame() = %q/%v, want data/true", got, ok) + } + if got, ok := tr.nextOutboundFrame(); !ok || got != nil { + t.Fatalf("idle nextOutboundFrame() = %q/%v, want nil/true", got, ok) + } + + idle, err := tr.renderFrame(nil) + if err != nil { + t.Fatalf("renderFrame(nil) error = %v", err) + } + if len(idle) != tr.videoW*tr.videoH { + t.Fatalf("idle frame len = %d, want %d", len(idle), tr.videoW*tr.videoH) + } + if features := tr.Features(); features.MaxPayloadSize != defaultMaxPayloadSize { + t.Fatalf("Features() = %+v", features) + } + + tr.videoQRSize = defaultMaxPayloadSize + if features := tr.Features(); features.MaxPayloadSize <= defaultMaxPayloadSize { + t.Fatalf("Features(large qr) = %+v", features) + } + + tr.closed.Store(true) + if err := tr.enqueueFrame([]byte("closed"), false); !errors.Is(err, ErrTransportClosed) { + t.Fatalf("enqueueFrame(closed) error = %v, want %v", err, ErrTransportClosed) + } +} + +func TestNextOutboundFrameStopsWhenClosed(t *testing.T) { + tr := &streamTransport{ + outbound: make(chan []byte, 1), + outboundAck: make(chan []byte, 1), + closeCh: make(chan struct{}), + } + close(tr.closeCh) + if got, ok := tr.nextOutboundFrame(); ok || got != nil { + t.Fatalf("nextOutboundFrame(closed) = %q/%v, want nil/false", got, ok) + } +} diff --git a/internal/transport/vp8channel/transport_unit_test.go b/internal/transport/vp8channel/transport_unit_test.go new file mode 100644 index 0000000..8165ac6 --- /dev/null +++ b/internal/transport/vp8channel/transport_unit_test.go @@ -0,0 +1,272 @@ +package vp8channel + +import ( + "bytes" + "context" + "encoding/binary" + "errors" + "testing" + + "github.com/openlibrecommunity/olcrtc/internal/carrier" + "github.com/openlibrecommunity/olcrtc/internal/transport" + "github.com/pion/rtp" + "github.com/pion/webrtc/v4" +) + +type fakeVideoSession struct { + stream *fakeVideoStream + err error +} + +func (s *fakeVideoSession) Capabilities() carrier.Capabilities { + return carrier.Capabilities{VideoTrack: true} +} +func (s *fakeVideoSession) OpenVideoTrack() (carrier.VideoTrack, error) { + if s.err != nil { + return nil, s.err + } + return s.stream, nil +} + +type fakeVideoStream struct { + connectErr error + closeErr error + canSend bool + trackAdded bool + trackCB func(*webrtc.TrackRemote, *webrtc.RTPReceiver) + reconnect func() + should func() bool + ended func(string) + watched bool + closed bool +} + +func (s *fakeVideoStream) Connect(context.Context) error { return s.connectErr } +func (s *fakeVideoStream) Close() error { + s.closed = true + return s.closeErr +} +func (s *fakeVideoStream) SetReconnectCallback(cb func()) { s.reconnect = cb } +func (s *fakeVideoStream) SetShouldReconnect(fn func() bool) { s.should = fn } +func (s *fakeVideoStream) SetEndedCallback(cb func(string)) { s.ended = cb } +func (s *fakeVideoStream) WatchConnection(context.Context) { s.watched = true } +func (s *fakeVideoStream) CanSend() bool { return s.canSend } +func (s *fakeVideoStream) AddTrack(webrtc.TrackLocal) error { s.trackAdded = true; return nil } +func (s *fakeVideoStream) SetTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) { + s.trackCB = cb +} + +type nonVideoSession struct{} + +func (s *nonVideoSession) Capabilities() carrier.Capabilities { return carrier.Capabilities{} } + +func TestNewConnectSendCallbacksFeaturesAndClose(t *testing.T) { + stream := &fakeVideoStream{canSend: true} + name := "vp8channel-unit-new" + carrier.Register(name, func(context.Context, carrier.Config) (carrier.Session, error) { + return &fakeVideoSession{stream: stream}, nil + }) + + trIface, err := New(context.Background(), transport.Config{ + Carrier: name, + ClientID: "client", + VP8FPS: 30, + VP8BatchSize: 1, + }) + if err != nil { + t.Fatalf("New() error = %v", err) + } + tr := trIface.(*streamTransport) + if !stream.trackAdded || stream.trackCB == nil { + t.Fatal("New() did not attach track and handler") + } + if err := tr.Connect(context.Background()); err != nil { + t.Fatalf("Connect() error = %v", err) + } + if tr.kcp == nil || !tr.writerUp.Load() { + t.Fatal("Connect() did not initialize kcp/writer") + } + tr.SetReconnectCallback(func() {}) + tr.SetShouldReconnect(func() bool { return true }) + tr.SetEndedCallback(func(string) {}) + tr.WatchConnection(context.Background()) + if stream.reconnect == nil || stream.should == nil || stream.ended == nil || !stream.watched { + t.Fatal("callbacks/watch were not forwarded") + } + if !tr.CanSend() { + t.Fatal("CanSend() = false, want true") + } + if features := tr.Features(); !features.Reliable || !features.Ordered || !features.MessageOriented || features.MaxPayloadSize == 0 { + t.Fatalf("Features() = %+v", features) + } + if err := tr.Send([]byte("payload")); err != nil { + t.Fatalf("Send() error = %v", err) + } + tr.drainOutbound() + if err := tr.Close(); err != nil { + t.Fatalf("Close() error = %v", err) + } + if err := tr.Send([]byte("closed")); !errors.Is(err, ErrTransportClosed) { + t.Fatalf("Send(closed) error = %v, want %v", err, ErrTransportClosed) + } +} + +func TestNewErrorPaths(t *testing.T) { + carrier.Register("vp8channel-create-fails", func(context.Context, carrier.Config) (carrier.Session, error) { + return nil, errors.New("boom") + }) + if _, err := New(context.Background(), transport.Config{Carrier: "vp8channel-create-fails"}); err == nil || err.Error() != "create provider transport: boom" { + t.Fatalf("New() error = %v", err) + } + + carrier.Register("vp8channel-no-video", func(context.Context, carrier.Config) (carrier.Session, error) { + return &nonVideoSession{}, nil + }) + if _, err := New(context.Background(), transport.Config{Carrier: "vp8channel-no-video"}); !errors.Is(err, ErrVideoTrackUnsupported) { + t.Fatalf("New() error = %v, want %v", err, ErrVideoTrackUnsupported) + } + + carrier.Register("vp8channel-open-fails", func(context.Context, carrier.Config) (carrier.Session, error) { + return &fakeVideoSession{err: errors.New("open boom")}, nil + }) + if _, err := New(context.Background(), transport.Config{Carrier: "vp8channel-open-fails"}); err == nil || err.Error() != "open video track: open boom" { + t.Fatalf("New() error = %v", err) + } +} + +func TestEpochHeaderTokenAndOutboundCapacity(t *testing.T) { + tr := &streamTransport{ + stream: &fakeVideoStream{canSend: true}, + outbound: make(chan []byte, 10), + closeCh: make(chan struct{}), + writerDone: make(chan struct{}), + bindingToken: bindingToken("client"), + localEpoch: 0x01020304, + } + + hdr := tr.epochHeader() + if hdr[0] != kcpFrameMagic || + binary.BigEndian.Uint32(hdr[tokenOff:epochOff]) != tr.bindingToken || + binary.BigEndian.Uint32(hdr[epochOff:]) != tr.localEpoch { + t.Fatalf("epochHeader() = %x", hdr) + } + if bindingToken("") == 0 || randomEpoch() == 0 { + t.Fatal("bindingToken/randomEpoch returned zero") + } + + for len(tr.outbound) < cap(tr.outbound)*canSendHighWatermark/100 { + tr.outbound <- []byte("queued") + } + if tr.CanSend() { + t.Fatal("CanSend() = true at high watermark") + } + tr.drainOutbound() + if !tr.CanSend() { + t.Fatal("CanSend() = false after drain") + } + tr.closed.Store(true) + if tr.CanSend() { + t.Fatal("CanSend() = true after closed") + } +} + +func TestVP8FrameStateAssemblesAndRejectsCorruptFrames(t *testing.T) { + frame := append([]byte{kcpFrameMagic}, bytes.Repeat([]byte{0x01}, epochHdrLen)...) + var state vp8FrameState + + got := state.processRTPPacket(&rtp.Packet{ + Header: rtp.Header{SequenceNumber: 10, Marker: true}, + Payload: append([]byte{0x10}, frame...), + }) + if !bytes.Equal(got, frame) { + t.Fatalf("single-packet frame = %x, want %x", got, frame) + } + + state = vp8FrameState{} + if got := state.processRTPPacket(&rtp.Packet{ + Header: rtp.Header{SequenceNumber: 20}, + Payload: append([]byte{0x10}, frame[:4]...), + }); got != nil { + t.Fatalf("partial frame = %x, want nil", got) + } + got = state.processRTPPacket(&rtp.Packet{ + Header: rtp.Header{SequenceNumber: 21, Marker: true}, + Payload: append([]byte{0x00}, frame[4:]...), + }) + if !bytes.Equal(got, frame) { + t.Fatalf("fragmented frame = %x, want %x", got, frame) + } + + state = vp8FrameState{} + _ = state.processRTPPacket(&rtp.Packet{ + Header: rtp.Header{SequenceNumber: 30}, + Payload: append([]byte{0x10}, frame[:4]...), + }) + if got := state.processRTPPacket(&rtp.Packet{ + Header: rtp.Header{SequenceNumber: 32, Marker: true}, + Payload: append([]byte{0x00}, frame[4:]...), + }); got != nil { + t.Fatalf("frame after sequence gap = %x, want nil", got) + } + + state = vp8FrameState{} + if got := state.processRTPPacket(&rtp.Packet{ + Header: rtp.Header{SequenceNumber: 40, Marker: true}, + Payload: []byte{}, + }); got != nil { + t.Fatalf("bad vp8 payload = %x, want nil", got) + } +} + +func TestHandleIncomingFrameEpochFilteringAndReconnect(t *testing.T) { + called := 0 + tr := &streamTransport{ + stream: &fakeVideoStream{canSend: true}, + outbound: make(chan []byte, 16), + closeCh: make(chan struct{}), + writerDone: make(chan struct{}), + bindingToken: bindingToken("client"), + localEpoch: 0x100, + onData: func([]byte) { called++ }, + } + defer func() { + _ = tr.Close() + }() + + mkFrame := func(token, epoch uint32, payload []byte) []byte { + frame := make([]byte, epochHdrLen+len(payload)) + frame[0] = kcpFrameMagic + binary.BigEndian.PutUint32(frame[tokenOff:epochOff], token) + binary.BigEndian.PutUint32(frame[epochOff:epochHdrLen], epoch) + copy(frame[epochHdrLen:], payload) + return frame + } + + tr.handleIncomingFrame(mkFrame(bindingToken("other"), 1, []byte("x"))) + tr.handleIncomingFrame(mkFrame(tr.bindingToken, tr.localEpoch, []byte("self"))) + tr.handleIncomingFrame(mkFrame(tr.bindingToken, 1, nil)) + if tr.hadPeer.Load() || called != 0 { + t.Fatal("filtered frames changed peer state") + } + + tr.handleIncomingFrame(mkFrame(tr.bindingToken, 1, []byte("first"))) + if !tr.hadPeer.Load() || tr.peerEpoch.Load() != 1 { + t.Fatalf("peer state after first frame: had=%v epoch=%d", tr.hadPeer.Load(), tr.peerEpoch.Load()) + } + + reconnected := false + tr.SetReconnectCallback(func() { reconnected = true }) + stream := tr.stream.(*fakeVideoStream) + if stream.reconnect == nil { + t.Fatal("SetReconnectCallback did not install stream callback") + } + stream.reconnect() + if !reconnected || tr.kcp == nil { + t.Fatalf("stream reconnect did not reset/callback: reconnected=%v kcp=%v", reconnected, tr.kcp) + } + reconnected = false + tr.handleIncomingFrame(mkFrame(tr.bindingToken, 2, []byte("after-restart"))) + if !reconnected || tr.peerEpoch.Load() != 2 || tr.kcp == nil { + t.Fatalf("epoch change did not reset/reconnect: reconnected=%v epoch=%d kcp=%v", reconnected, tr.peerEpoch.Load(), tr.kcp) + } +} diff --git a/mobile/mobile.go b/mobile/mobile.go index 3604ad1..0ee9a45 100644 --- a/mobile/mobile.go +++ b/mobile/mobile.go @@ -52,14 +52,15 @@ const ( //nolint:gochecknoglobals // Mobile bindings expose a singleton runtime controlled by the embedding app. var ( - mu sync.Mutex - defaults mobileConfig - defaultsSet sync.Once - registerSet sync.Once - cancel context.CancelFunc - done chan struct{} - ready chan struct{} - errRun error + mu sync.Mutex + defaults mobileConfig + defaultsSet sync.Once + registerSet sync.Once + runClientWithReady = client.RunWithReady + cancel context.CancelFunc + done chan struct{} + ready chan struct{} + errRun error ) type mobileConfig struct { @@ -200,7 +201,7 @@ func Check( startedAt := time.Now() go func() { - doneCh <- client.RunWithReady( + doneCh <- runClientWithReady( ctx, defaultLink, transportName, @@ -288,7 +289,7 @@ func startWithConfig( go func() { defer cancelFunc() - err := client.RunWithReady( + err := runClientWithReady( ctx, cfg.link, cfg.transport, diff --git a/mobile/mobile_test.go b/mobile/mobile_test.go index 72a26ee..c773644 100644 --- a/mobile/mobile_test.go +++ b/mobile/mobile_test.go @@ -1,6 +1,7 @@ package mobile import ( + "context" "errors" "log" "strings" @@ -39,6 +40,7 @@ func resetMobileGlobals(t *testing.T) { done = nil ready = nil errRun = nil + runClientWithReady = clientRunWithReady defaults = mobileConfig{} defaultsSet = sync.Once{} mu.Unlock() @@ -46,6 +48,8 @@ func resetMobileGlobals(t *testing.T) { logger.SetVerbose(false) } +var clientRunWithReady = runClientWithReady + func TestProtectorAndLogging(t *testing.T) { resetMobileGlobals(t) p := &testProtector{} @@ -151,6 +155,192 @@ func TestStartValidation(t *testing.T) { resetMobileGlobals(t) } +func TestStartWithInjectedRunnerLifecycle(t *testing.T) { + resetMobileGlobals(t) + t.Cleanup(func() { + resetMobileGlobals(t) + }) + + runClientWithReady = func( + ctx context.Context, + linkName, transportName, carrierName, roomURL, keyHex, clientID string, + localAddr string, + dnsServer, socksUser, socksPass string, + onReady func(), + videoWidth int, + videoHeight int, + videoFPS int, + videoBitrate string, + videoHW string, + videoQRSize int, + videoQRRecovery string, + videoCodec string, + videoTileModule int, + videoTileRS int, + vp8FPS int, + vp8BatchSize int, + ) error { + if linkName != defaultLink || transportName != dataTransport || carrierName != carrierJazz || + roomURL != "any" || clientID != "client" || localAddr != "127.0.0.1:1080" || + dnsServer != defaultDNSServer || vp8FPS != 60 || vp8BatchSize != 8 { + t.Fatalf("RunWithReady args mismatch: link=%q transport=%q carrier=%q room=%q client=%q local=%q dns=%q vp8=%d/%d", + linkName, transportName, carrierName, roomURL, clientID, localAddr, dnsServer, vp8FPS, vp8BatchSize) + } + onReady() + <-ctx.Done() + return ctx.Err() + } + + if err := StartWithTransport(carrierJazz, "dc", "", "client", "key", 1080, "", ""); err != nil { + t.Fatalf("StartWithTransport() error = %v", err) + } + if !IsRunning() { + t.Fatal("IsRunning() = false, want true") + } + if err := WaitReady(100); err != nil { + t.Fatalf("WaitReady() error = %v", err) + } + Stop() + if IsRunning() { + t.Fatal("IsRunning() = true after Stop") + } +} + +func TestStartUsesDefaultsAndCheckWithInjectedRunner(t *testing.T) { + resetMobileGlobals(t) + t.Cleanup(func() { + resetMobileGlobals(t) + }) + + runClientWithReady = func( + ctx context.Context, + linkName, transportName, carrierName, roomURL, keyHex, clientID string, + localAddr string, + dnsServer, socksUser, socksPass string, + onReady func(), + videoWidth int, + videoHeight int, + videoFPS int, + videoBitrate string, + videoHW string, + videoQRSize int, + videoQRRecovery string, + videoCodec string, + videoTileModule int, + videoTileRS int, + vp8FPS int, + vp8BatchSize int, + ) error { + if transportName != defaultTransport || roomURL != "https://telemost.yandex.ru/j/room" || + localAddr != "127.0.0.1:1081" || socksUser != "u" || socksPass != "p" { + t.Fatalf("Start args mismatch: transport=%q room=%q local=%q user/pass=%q/%q", + transportName, roomURL, localAddr, socksUser, socksPass) + } + onReady() + <-ctx.Done() + return ctx.Err() + } + + if err := Start("telemost", "room", "client", "key", 1081, "u", "p"); err != nil { + t.Fatalf("Start() error = %v", err) + } + if err := WaitReady(100); err != nil { + t.Fatalf("WaitReady() error = %v", err) + } + Stop() + + runClientWithReady = func( + ctx context.Context, + linkName, transportName, carrierName, roomURL, keyHex, clientID string, + localAddr string, + dnsServer, socksUser, socksPass string, + onReady func(), + videoWidth int, + videoHeight int, + videoFPS int, + videoBitrate string, + videoHW string, + videoQRSize int, + videoQRRecovery string, + videoCodec string, + videoTileModule int, + videoTileRS int, + vp8FPS int, + vp8BatchSize int, + ) error { + if transportName != dataTransport || vp8FPS != 1 || vp8BatchSize != 64 { + t.Fatalf("Check args mismatch: transport=%q vp8=%d/%d", transportName, vp8FPS, vp8BatchSize) + } + onReady() + <-ctx.Done() + return nil + } + elapsed, err := Check("jazz", "dc", "", "client", "key", 1082, 100, -1, 999) + if err != nil { + t.Fatalf("Check() error = %v", err) + } + if elapsed < 0 { + t.Fatalf("Check() elapsed = %d", elapsed) + } +} + +func TestCheckTimeoutAndRunError(t *testing.T) { + resetMobileGlobals(t) + t.Cleanup(func() { + resetMobileGlobals(t) + }) + + runClientWithReady = func( + ctx context.Context, + linkName, transportName, carrierName, roomURL, keyHex, clientID string, + localAddr string, + dnsServer, socksUser, socksPass string, + onReady func(), + videoWidth int, + videoHeight int, + videoFPS int, + videoBitrate string, + videoHW string, + videoQRSize int, + videoQRRecovery string, + videoCodec string, + videoTileModule int, + videoTileRS int, + vp8FPS int, + vp8BatchSize int, + ) error { + <-ctx.Done() + return nil + } + if _, err := Check("telemost", defaultTransport, "room", "client", "key", 1083, 1, 30, 1); !errors.Is(err, errStartTimedOut) { + t.Fatalf("Check(timeout) error = %v, want %v", err, errStartTimedOut) + } + + want := errors.New("check failed") + runClientWithReady = func( + context.Context, + string, string, string, string, string, string, + string, + string, string, string, + func(), + int, int, int, + string, + string, + int, + string, + string, + int, + int, + int, + int, + ) error { + return want + } + if _, err := Check("telemost", defaultTransport, "room", "client", "key", 1084, 100, 30, 1); !errors.Is(err, want) { + t.Fatalf("Check(run error) = %v, want %v", err, want) + } +} + func TestWaitReadyStatesAndStop(t *testing.T) { resetMobileGlobals(t)