mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-05-31 09:29:45 +00:00
feat(test): up test coverage
This commit is contained in:
@@ -6,14 +6,15 @@ import (
|
||||
"errors"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"os/signal"
|
||||
"path/filepath"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
lksdk "github.com/livekit/server-sdk-go/v2"
|
||||
protoLogger "github.com/livekit/protocol/logger"
|
||||
lksdk "github.com/livekit/server-sdk-go/v2"
|
||||
"github.com/openlibrecommunity/olcrtc/internal/app/session"
|
||||
"github.com/openlibrecommunity/olcrtc/internal/logger"
|
||||
"github.com/openlibrecommunity/olcrtc/internal/names"
|
||||
@@ -22,27 +23,30 @@ import (
|
||||
// ErrDataDirRequired is returned when no data directory is specified.
|
||||
var ErrDataDirRequired = errors.New("data directory required (use -data data)")
|
||||
|
||||
//nolint:gochecknoglobals // Tests replace the long-running session runner with a bounded function.
|
||||
var runSession = session.Run
|
||||
|
||||
type config struct {
|
||||
mode string
|
||||
link string
|
||||
transport string
|
||||
carrier string
|
||||
roomID string
|
||||
clientID string
|
||||
provider string
|
||||
socksPort int
|
||||
socksHost string
|
||||
keyHex string
|
||||
debug bool
|
||||
dataDir string
|
||||
dnsServer string
|
||||
socksProxyAddr string
|
||||
socksProxyPort int
|
||||
videoWidth int
|
||||
videoHeight int
|
||||
videoFPS int
|
||||
videoBitrate string
|
||||
videoHW string
|
||||
mode string
|
||||
link string
|
||||
transport string
|
||||
carrier string
|
||||
roomID string
|
||||
clientID string
|
||||
provider string
|
||||
socksPort int
|
||||
socksHost string
|
||||
keyHex string
|
||||
debug bool
|
||||
dataDir string
|
||||
dnsServer string
|
||||
socksProxyAddr string
|
||||
socksProxyPort int
|
||||
videoWidth int
|
||||
videoHeight int
|
||||
videoFPS int
|
||||
videoBitrate string
|
||||
videoHW string
|
||||
videoQRSize int
|
||||
videoQRRecovery string
|
||||
videoCodec string
|
||||
@@ -60,9 +64,20 @@ func main() {
|
||||
}
|
||||
|
||||
func run() error {
|
||||
return runWithArgs(os.Args[1:])
|
||||
}
|
||||
|
||||
func runWithArgs(args []string) error {
|
||||
session.RegisterDefaults()
|
||||
|
||||
cfg := parseFlags()
|
||||
cfg, err := parseFlagsFrom(args, flag.ExitOnError)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return runWithConfig(cfg)
|
||||
}
|
||||
|
||||
func runWithConfig(cfg config) error {
|
||||
configureLogging(cfg.debug)
|
||||
|
||||
if err := session.Validate(toSessionConfig(cfg)); err != nil {
|
||||
@@ -90,7 +105,7 @@ func run() error {
|
||||
|
||||
errCh := make(chan error, 1)
|
||||
go func() {
|
||||
errCh <- session.Run(ctx, toSessionConfig(cfg))
|
||||
errCh <- runSession(ctx, toSessionConfig(cfg))
|
||||
}()
|
||||
|
||||
select {
|
||||
@@ -104,43 +119,51 @@ func run() error {
|
||||
}
|
||||
|
||||
func parseFlags() config {
|
||||
cfg := config{}
|
||||
|
||||
flag.StringVar(&cfg.mode, "mode", "", "Mode: srv or cnc")
|
||||
flag.StringVar(&cfg.link, "link", "", "Link: direct (p2p connection type)")
|
||||
flag.StringVar(&cfg.transport, "transport", "", "Transport: datachannel, videochannel, seichannel")
|
||||
flag.StringVar(&cfg.carrier, "carrier", "", "Carrier: telemost, jazz, wbstream")
|
||||
flag.StringVar(&cfg.roomID, "id", "", "Room ID")
|
||||
flag.StringVar(&cfg.clientID, "client-id", "", "Client ID: binds one srv to one cnc (required)")
|
||||
flag.StringVar(&cfg.provider, "provider", "", "Deprecated alias for -carrier")
|
||||
flag.IntVar(&cfg.socksPort, "socks-port", 0, "SOCKS5 port (client only)")
|
||||
flag.StringVar(&cfg.socksHost, "socks-host", "", "SOCKS5 listen host (client only)")
|
||||
flag.StringVar(&cfg.keyHex, "key", "", "Shared encryption key (hex)")
|
||||
flag.BoolVar(&cfg.debug, "debug", false, "Enable verbose logging")
|
||||
flag.StringVar(&cfg.dataDir, "data", "", "Path to data directory")
|
||||
flag.StringVar(&cfg.dnsServer, "dns", "", "DNS server (e.g. 1.1.1.1:53)")
|
||||
flag.StringVar(&cfg.socksProxyAddr, "socks-proxy", "", "SOCKS5 proxy address (server only)")
|
||||
flag.IntVar(&cfg.socksProxyPort, "socks-proxy-port", 0, "SOCKS5 proxy port (server only)")
|
||||
flag.IntVar(&cfg.videoWidth, "video-w", 0, "Video logical width (videochannel only)")
|
||||
flag.IntVar(&cfg.videoHeight, "video-h", 0, "Video logical height (videochannel only)")
|
||||
flag.IntVar(&cfg.videoFPS, "video-fps", 0, "Video frames per second (videochannel only)")
|
||||
flag.StringVar(&cfg.videoBitrate, "video-bitrate", "", "Video bitrate (videochannel only)")
|
||||
flag.StringVar(&cfg.videoHW, "video-hw", "", "Hardware acceleration (none, nvenc)")
|
||||
flag.IntVar(&cfg.videoQRSize, "video-qr-size", 0, "Video QR code fragment size (videochannel only)")
|
||||
flag.StringVar(&cfg.videoQRRecovery, "video-qr-recovery", "low",
|
||||
"QR error correction: low (7%), medium (15%), high (25%), highest (30%)")
|
||||
flag.StringVar(&cfg.videoCodec, "video-codec", "qrcode", "Visual codec: qrcode or tile")
|
||||
flag.IntVar(&cfg.videoTileModule, "video-tile-module", 0,
|
||||
"Tile module size in pixels 1..270 (videochannel tile only, default 4)")
|
||||
flag.IntVar(&cfg.videoTileRS, "video-tile-rs", 0,
|
||||
"Tile Reed-Solomon parity percent 0..200 (videochannel tile only, default 20)")
|
||||
flag.IntVar(&cfg.vp8FPS, "vp8-fps", 0, "VP8 frames per second (vp8channel only, default 25)")
|
||||
flag.IntVar(&cfg.vp8BatchSize, "vp8-batch", 0, "VP8 frames per tick (vp8channel only, default 1)")
|
||||
flag.Parse()
|
||||
|
||||
cfg, _ := parseFlagsFrom(os.Args[1:], flag.ExitOnError)
|
||||
return cfg
|
||||
}
|
||||
|
||||
func parseFlagsFrom(args []string, errorHandling flag.ErrorHandling) (config, error) {
|
||||
cfg := config{}
|
||||
fs := flag.NewFlagSet("olcrtc", errorHandling)
|
||||
if errorHandling == flag.ContinueOnError {
|
||||
fs.SetOutput(io.Discard)
|
||||
}
|
||||
|
||||
fs.StringVar(&cfg.mode, "mode", "", "Mode: srv or cnc")
|
||||
fs.StringVar(&cfg.link, "link", "", "Link: direct (p2p connection type)")
|
||||
fs.StringVar(&cfg.transport, "transport", "", "Transport: datachannel, videochannel, seichannel")
|
||||
fs.StringVar(&cfg.carrier, "carrier", "", "Carrier: telemost, jazz, wbstream")
|
||||
fs.StringVar(&cfg.roomID, "id", "", "Room ID")
|
||||
fs.StringVar(&cfg.clientID, "client-id", "", "Client ID: binds one srv to one cnc (required)")
|
||||
fs.StringVar(&cfg.provider, "provider", "", "Deprecated alias for -carrier")
|
||||
fs.IntVar(&cfg.socksPort, "socks-port", 0, "SOCKS5 port (client only)")
|
||||
fs.StringVar(&cfg.socksHost, "socks-host", "", "SOCKS5 listen host (client only)")
|
||||
fs.StringVar(&cfg.keyHex, "key", "", "Shared encryption key (hex)")
|
||||
fs.BoolVar(&cfg.debug, "debug", false, "Enable verbose logging")
|
||||
fs.StringVar(&cfg.dataDir, "data", "", "Path to data directory")
|
||||
fs.StringVar(&cfg.dnsServer, "dns", "", "DNS server (e.g. 1.1.1.1:53)")
|
||||
fs.StringVar(&cfg.socksProxyAddr, "socks-proxy", "", "SOCKS5 proxy address (server only)")
|
||||
fs.IntVar(&cfg.socksProxyPort, "socks-proxy-port", 0, "SOCKS5 proxy port (server only)")
|
||||
fs.IntVar(&cfg.videoWidth, "video-w", 0, "Video logical width (videochannel only)")
|
||||
fs.IntVar(&cfg.videoHeight, "video-h", 0, "Video logical height (videochannel only)")
|
||||
fs.IntVar(&cfg.videoFPS, "video-fps", 0, "Video frames per second (videochannel only)")
|
||||
fs.StringVar(&cfg.videoBitrate, "video-bitrate", "", "Video bitrate (videochannel only)")
|
||||
fs.StringVar(&cfg.videoHW, "video-hw", "", "Hardware acceleration (none, nvenc)")
|
||||
fs.IntVar(&cfg.videoQRSize, "video-qr-size", 0, "Video QR code fragment size (videochannel only)")
|
||||
fs.StringVar(&cfg.videoQRRecovery, "video-qr-recovery", "low",
|
||||
"QR error correction: low (7%), medium (15%), high (25%), highest (30%)")
|
||||
fs.StringVar(&cfg.videoCodec, "video-codec", "qrcode", "Visual codec: qrcode or tile")
|
||||
fs.IntVar(&cfg.videoTileModule, "video-tile-module", 0,
|
||||
"Tile module size in pixels 1..270 (videochannel tile only, default 4)")
|
||||
fs.IntVar(&cfg.videoTileRS, "video-tile-rs", 0,
|
||||
"Tile Reed-Solomon parity percent 0..200 (videochannel tile only, default 20)")
|
||||
fs.IntVar(&cfg.vp8FPS, "vp8-fps", 0, "VP8 frames per second (vp8channel only, default 25)")
|
||||
fs.IntVar(&cfg.vp8BatchSize, "vp8-batch", 0, "VP8 frames per tick (vp8channel only, default 1)")
|
||||
|
||||
return cfg, fs.Parse(args)
|
||||
}
|
||||
|
||||
func configureLogging(debug bool) {
|
||||
if debug {
|
||||
logger.SetVerbose(true)
|
||||
|
||||
@@ -1,7 +1,9 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"flag"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"testing"
|
||||
@@ -58,6 +60,117 @@ func TestToSessionConfigAndFirstNonEmpty(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestParseFlagsFrom(t *testing.T) {
|
||||
cfg, err := parseFlagsFrom([]string{
|
||||
"-mode", "srv",
|
||||
"-link", "direct",
|
||||
"-transport", "vp8channel",
|
||||
"-carrier", "telemost",
|
||||
"-id", "room",
|
||||
"-client-id", "client",
|
||||
"-socks-port", "1080",
|
||||
"-socks-host", "127.0.0.1",
|
||||
"-key", "key",
|
||||
"-debug",
|
||||
"-data", "data",
|
||||
"-dns", "9.9.9.9:53",
|
||||
"-socks-proxy", "proxy",
|
||||
"-socks-proxy-port", "1081",
|
||||
"-video-w", "640",
|
||||
"-video-h", "480",
|
||||
"-video-fps", "30",
|
||||
"-video-bitrate", "1M",
|
||||
"-video-hw", "none",
|
||||
"-video-qr-size", "128",
|
||||
"-video-qr-recovery", "high",
|
||||
"-video-codec", "tile",
|
||||
"-video-tile-module", "6",
|
||||
"-video-tile-rs", "40",
|
||||
"-vp8-fps", "24",
|
||||
"-vp8-batch", "3",
|
||||
}, flag.ContinueOnError)
|
||||
if err != nil {
|
||||
t.Fatalf("parseFlagsFrom() error = %v", err)
|
||||
}
|
||||
if cfg.mode != "srv" || cfg.carrier != "telemost" || cfg.roomID != "room" ||
|
||||
cfg.debug != true || cfg.videoCodec != "tile" || cfg.videoTileRS != 40 ||
|
||||
cfg.vp8FPS != 24 || cfg.vp8BatchSize != 3 {
|
||||
t.Fatalf("parseFlagsFrom() = %+v", cfg)
|
||||
}
|
||||
|
||||
_, err = parseFlagsFrom([]string{"-bad"}, flag.ContinueOnError)
|
||||
if err == nil {
|
||||
t.Fatal("parseFlagsFrom(bad flag) error = nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunWithConfigValidationAndDataDirErrors(t *testing.T) {
|
||||
session.RegisterDefaults()
|
||||
cfg := config{
|
||||
mode: "srv",
|
||||
link: "direct",
|
||||
transport: "datachannel",
|
||||
carrier: "jazz",
|
||||
clientID: "client",
|
||||
keyHex: "key",
|
||||
dnsServer: "1.1.1.1:53",
|
||||
videoCodec: "qrcode",
|
||||
}
|
||||
if err := runWithConfig(cfg); !errors.Is(err, ErrDataDirRequired) {
|
||||
t.Fatalf("runWithConfig(no data dir) = %v, want %v", err, ErrDataDirRequired)
|
||||
}
|
||||
|
||||
cfg.mode = ""
|
||||
if err := runWithConfig(cfg); err == nil {
|
||||
t.Fatal("runWithConfig(invalid config) error = nil")
|
||||
}
|
||||
}
|
||||
|
||||
func TestRunWithArgsSuccessfulSessionReturn(t *testing.T) {
|
||||
dir := t.TempDir()
|
||||
if err := os.WriteFile(filepath.Join(dir, "names"), []byte("A\n"), 0o600); err != nil {
|
||||
t.Fatalf("WriteFile(names) error = %v", err)
|
||||
}
|
||||
if err := os.WriteFile(filepath.Join(dir, "surnames"), []byte("B\n"), 0o600); err != nil {
|
||||
t.Fatalf("WriteFile(surnames) error = %v", err)
|
||||
}
|
||||
|
||||
oldRunSession := runSession
|
||||
t.Cleanup(func() {
|
||||
runSession = oldRunSession
|
||||
})
|
||||
called := false
|
||||
runSession = func(ctx context.Context, cfg session.Config) error {
|
||||
called = true
|
||||
if cfg.Mode != "srv" || cfg.Carrier != "jazz" || cfg.ClientID != "client" {
|
||||
t.Fatalf("session config = %+v", cfg)
|
||||
}
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
t.Fatal("context canceled before session returned")
|
||||
default:
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
err := runWithArgs([]string{
|
||||
"-mode", "srv",
|
||||
"-link", "direct",
|
||||
"-transport", "datachannel",
|
||||
"-carrier", "jazz",
|
||||
"-client-id", "client",
|
||||
"-key", "key",
|
||||
"-dns", "1.1.1.1:53",
|
||||
"-data", dir,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("runWithArgs() error = %v", err)
|
||||
}
|
||||
if !called {
|
||||
t.Fatal("runWithArgs() did not call session runner")
|
||||
}
|
||||
}
|
||||
|
||||
func TestConfigureLogging(t *testing.T) {
|
||||
logger.SetVerbose(false)
|
||||
configureLogging(true)
|
||||
|
||||
159
internal/transport/seichannel/transport_unit_test.go
Normal file
159
internal/transport/seichannel/transport_unit_test.go
Normal file
@@ -0,0 +1,159 @@
|
||||
package seichannel
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"hash/crc32"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/openlibrecommunity/olcrtc/internal/carrier"
|
||||
"github.com/openlibrecommunity/olcrtc/internal/transport"
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
type fakeVideoSession struct {
|
||||
stream *fakeVideoStream
|
||||
err error
|
||||
}
|
||||
|
||||
func (s *fakeVideoSession) Capabilities() carrier.Capabilities {
|
||||
return carrier.Capabilities{VideoTrack: true}
|
||||
}
|
||||
func (s *fakeVideoSession) OpenVideoTrack() (carrier.VideoTrack, error) {
|
||||
if s.err != nil {
|
||||
return nil, s.err
|
||||
}
|
||||
return s.stream, nil
|
||||
}
|
||||
|
||||
type fakeVideoStream struct {
|
||||
connectErr error
|
||||
closeErr error
|
||||
canSend bool
|
||||
|
||||
trackAdded bool
|
||||
trackCB func(*webrtc.TrackRemote, *webrtc.RTPReceiver)
|
||||
reconnect func()
|
||||
should func() bool
|
||||
ended func(string)
|
||||
watched bool
|
||||
closed bool
|
||||
}
|
||||
|
||||
func (s *fakeVideoStream) Connect(context.Context) error { return s.connectErr }
|
||||
func (s *fakeVideoStream) Close() error {
|
||||
s.closed = true
|
||||
return s.closeErr
|
||||
}
|
||||
func (s *fakeVideoStream) SetReconnectCallback(cb func()) { s.reconnect = cb }
|
||||
func (s *fakeVideoStream) SetShouldReconnect(fn func() bool) { s.should = fn }
|
||||
func (s *fakeVideoStream) SetEndedCallback(cb func(string)) { s.ended = cb }
|
||||
func (s *fakeVideoStream) WatchConnection(context.Context) { s.watched = true }
|
||||
func (s *fakeVideoStream) CanSend() bool { return s.canSend }
|
||||
func (s *fakeVideoStream) AddTrack(webrtc.TrackLocal) error { s.trackAdded = true; return nil }
|
||||
func (s *fakeVideoStream) SetTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) {
|
||||
s.trackCB = cb
|
||||
}
|
||||
|
||||
type nonVideoSession struct{}
|
||||
|
||||
func (s *nonVideoSession) Capabilities() carrier.Capabilities { return carrier.Capabilities{} }
|
||||
|
||||
func TestNewConnectCallbacksAndFeatures(t *testing.T) {
|
||||
stream := &fakeVideoStream{canSend: true}
|
||||
name := "seichannel-unit-new"
|
||||
carrier.Register(name, func(context.Context, carrier.Config) (carrier.Session, error) {
|
||||
return &fakeVideoSession{stream: stream}, nil
|
||||
})
|
||||
|
||||
trIface, err := New(context.Background(), transport.Config{Carrier: name})
|
||||
if err != nil {
|
||||
t.Fatalf("New() error = %v", err)
|
||||
}
|
||||
tr := trIface.(*streamTransport)
|
||||
if !stream.trackAdded || stream.trackCB == nil {
|
||||
t.Fatal("New() did not attach track and handler")
|
||||
}
|
||||
if err := tr.Connect(context.Background()); err != nil {
|
||||
t.Fatalf("Connect() error = %v", err)
|
||||
}
|
||||
if !tr.writerUp.Load() {
|
||||
t.Fatal("Connect() did not start writer")
|
||||
}
|
||||
tr.SetReconnectCallback(func() {})
|
||||
tr.SetShouldReconnect(func() bool { return true })
|
||||
tr.SetEndedCallback(func(string) {})
|
||||
tr.WatchConnection(context.Background())
|
||||
if stream.reconnect == nil || stream.should == nil || stream.ended == nil || !stream.watched {
|
||||
t.Fatal("callbacks/watch were not forwarded")
|
||||
}
|
||||
if !tr.CanSend() {
|
||||
t.Fatal("CanSend() = false, want true")
|
||||
}
|
||||
if features := tr.Features(); !features.Reliable || !features.Ordered || !features.MessageOriented || features.MaxPayloadSize == 0 {
|
||||
t.Fatalf("Features() = %+v", features)
|
||||
}
|
||||
if err := tr.Close(); err != nil {
|
||||
t.Fatalf("Close() error = %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewErrorPaths(t *testing.T) {
|
||||
carrier.Register("seichannel-create-fails", func(context.Context, carrier.Config) (carrier.Session, error) {
|
||||
return nil, errors.New("boom")
|
||||
})
|
||||
if _, err := New(context.Background(), transport.Config{Carrier: "seichannel-create-fails"}); err == nil || err.Error() != "create provider transport: boom" {
|
||||
t.Fatalf("New() error = %v", err)
|
||||
}
|
||||
|
||||
carrier.Register("seichannel-no-video", func(context.Context, carrier.Config) (carrier.Session, error) {
|
||||
return &nonVideoSession{}, nil
|
||||
})
|
||||
if _, err := New(context.Background(), transport.Config{Carrier: "seichannel-no-video"}); !errors.Is(err, ErrVideoTrackUnsupported) {
|
||||
t.Fatalf("New() error = %v, want %v", err, ErrVideoTrackUnsupported)
|
||||
}
|
||||
|
||||
carrier.Register("seichannel-open-fails", func(context.Context, carrier.Config) (carrier.Session, error) {
|
||||
return &fakeVideoSession{err: errors.New("open boom")}, nil
|
||||
})
|
||||
if _, err := New(context.Background(), transport.Config{Carrier: "seichannel-open-fails"}); err == nil || err.Error() != "open video track: open boom" {
|
||||
t.Fatalf("New() error = %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSendAckAndClosePaths(t *testing.T) {
|
||||
tr := &streamTransport{
|
||||
stream: &fakeVideoStream{canSend: true},
|
||||
outbound: make(chan []byte, 8),
|
||||
outboundAck: make(chan []byte, 8),
|
||||
closeCh: make(chan struct{}),
|
||||
writerDone: make(chan struct{}),
|
||||
ackWaiters: make(map[uint32]chan uint32),
|
||||
}
|
||||
|
||||
done := make(chan error, 1)
|
||||
payload := []byte("payload")
|
||||
go func() { done <- tr.Send(payload) }()
|
||||
|
||||
select {
|
||||
case frame := <-tr.outbound:
|
||||
decoded, err := decodeTransportFrame(frame)
|
||||
if err != nil {
|
||||
t.Fatalf("decodeTransportFrame() error = %v", err)
|
||||
}
|
||||
tr.resolveAck(decoded.seq, crc32.ChecksumIEEE(payload))
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("Send() did not enqueue frame")
|
||||
}
|
||||
|
||||
if err := <-done; err != nil {
|
||||
t.Fatalf("Send() error = %v", err)
|
||||
}
|
||||
if err := tr.Close(); err != nil {
|
||||
t.Fatalf("Close() error = %v", err)
|
||||
}
|
||||
if err := tr.Send([]byte("closed")); !errors.Is(err, ErrTransportClosed) {
|
||||
t.Fatalf("Send(closed) error = %v, want %v", err, ErrTransportClosed)
|
||||
}
|
||||
}
|
||||
@@ -76,6 +76,12 @@ func TestCodecSpecsAndArgs(t *testing.T) {
|
||||
if got := resolveEncoderCodec(vp8CodecSpec(), "none"); got != "libvpx" {
|
||||
t.Fatalf("resolveEncoderCodec(vp8,none) = %q", got)
|
||||
}
|
||||
if got := resolveEncoderCodec(vp9CodecSpec(), "nvenc"); got != "vp9_nvenc" {
|
||||
t.Fatalf("resolveEncoderCodec(vp9,nvenc) = %q", got)
|
||||
}
|
||||
if got := resolveEncoderCodec(codecSpec{mimeType: webrtc.MimeTypeAV1, encoder: "libaom-av1"}, "nvenc"); got != "av1_nvenc" {
|
||||
t.Fatalf("resolveEncoderCodec(av1,nvenc) = %q", got)
|
||||
}
|
||||
|
||||
args := buildEncoderArgs(vp8CodecSpec(), "vp8_nvenc", 320, 240, 30, "1M")
|
||||
for _, want := range []string{"-video_size", "320x240", "-framerate", "30", "vp8_nvenc", "-b:v", "1M", "ivf"} {
|
||||
@@ -87,6 +93,29 @@ func TestCodecSpecsAndArgs(t *testing.T) {
|
||||
if h264Args[len(h264Args)-2] != "h264" {
|
||||
t.Fatalf("h264 encoder args = %v", h264Args)
|
||||
}
|
||||
|
||||
if got := resolveDecoderName(h264CodecSpec(), "nvenc"); got != "h264_cuvid" {
|
||||
t.Fatalf("resolveDecoderName(h264,nvenc) = %q", got)
|
||||
}
|
||||
if got := resolveDecoderName(vp8CodecSpec(), "nvenc"); got != "vp8_cuvid" {
|
||||
t.Fatalf("resolveDecoderName(vp8,nvenc) = %q", got)
|
||||
}
|
||||
if got := resolveDecoderName(vp9CodecSpec(), "nvenc"); got != "vp9_cuvid" {
|
||||
t.Fatalf("resolveDecoderName(vp9,nvenc) = %q", got)
|
||||
}
|
||||
if got := resolveDecoderName(codecSpec{mimeType: "video/custom"}, "none"); got != "custom" {
|
||||
t.Fatalf("resolveDecoderName(custom,none) = %q", got)
|
||||
}
|
||||
decArgs := buildDecoderArgs(vp8CodecSpec(), "vp8", 320, 240, "gray")
|
||||
for _, want := range []string{"-f", "ivf", "-vcodec", "vp8", "scale=320:240:flags=neighbor,format=gray", "rawvideo"} {
|
||||
if !slices.Contains(decArgs, want) {
|
||||
t.Fatalf("buildDecoderArgs(vp8) = %v, missing %q", decArgs, want)
|
||||
}
|
||||
}
|
||||
h264DecArgs := buildDecoderArgs(h264CodecSpec(), "h264", 320, 240, "gray")
|
||||
if h264DecArgs[5] != "h264" {
|
||||
t.Fatalf("buildDecoderArgs(h264) = %v", h264DecArgs)
|
||||
}
|
||||
}
|
||||
|
||||
type shortWriter struct {
|
||||
@@ -105,6 +134,12 @@ type errWriter struct{}
|
||||
|
||||
func (w errWriter) Write([]byte) (int, error) { return 0, io.ErrClosedPipe }
|
||||
|
||||
type bufferWriteCloser struct {
|
||||
bytes.Buffer
|
||||
}
|
||||
|
||||
func (w *bufferWriteCloser) Close() error { return nil }
|
||||
|
||||
func TestIVFWritersAndWithStderr(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
if err := writeIVFHeader(&buf, "VP80", 320, 240, 30); err != nil {
|
||||
@@ -137,3 +172,83 @@ func TestIVFWritersAndWithStderr(t *testing.T) {
|
||||
t.Fatalf("withStderr(nil) = %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFFmpegProcessErrAndFrameValidation(t *testing.T) {
|
||||
enc := &ffmpegEncoder{
|
||||
stderr: bytes.NewBufferString("encoder failed"),
|
||||
frames: make(chan []byte, 1),
|
||||
frameSize: 4,
|
||||
}
|
||||
if _, err := enc.EncodeFrame([]byte("bad")); !errors.Is(err, ErrUnexpectedFrameSize) {
|
||||
t.Fatalf("EncodeFrame(short) error = %v, want %v", err, ErrUnexpectedFrameSize)
|
||||
}
|
||||
enc.setErr(errors.New("boom"))
|
||||
if _, err := enc.EncodeFrame([]byte("good")); err == nil || !strings.Contains(err.Error(), "encoder failed") {
|
||||
t.Fatalf("EncodeFrame(processErr) error = %v", err)
|
||||
}
|
||||
|
||||
dec := &ffmpegDecoder{stderr: bytes.NewBufferString("decoder failed")}
|
||||
dec.setErr(errors.New("boom"))
|
||||
if err := dec.PushSample([]byte("sample")); err == nil || !strings.Contains(err.Error(), "decoder failed") {
|
||||
t.Fatalf("PushSample(processErr) error = %v", err)
|
||||
}
|
||||
closed := &ffmpegDecoder{}
|
||||
closed.closed.Store(true)
|
||||
if err := closed.processErr(); !errors.Is(err, ErrTransportClosed) {
|
||||
t.Fatalf("decoder processErr(closed) = %v, want %v", err, ErrTransportClosed)
|
||||
}
|
||||
}
|
||||
|
||||
func TestFFmpegReadersAndSampleWriters(t *testing.T) {
|
||||
var ivf bytes.Buffer
|
||||
if err := writeIVFHeader(&ivf, "VP80", 2, 2, 30); err != nil {
|
||||
t.Fatalf("writeIVFHeader() error = %v", err)
|
||||
}
|
||||
if err := writeIVFFrame(&ivf, 1, []byte("frame")); err != nil {
|
||||
t.Fatalf("writeIVFFrame() error = %v", err)
|
||||
}
|
||||
enc := &ffmpegEncoder{stderr: &bytes.Buffer{}, frames: make(chan []byte, 2)}
|
||||
enc.readIVF(&ivf)
|
||||
if got := <-enc.frames; !bytes.Equal(got, []byte("frame")) {
|
||||
t.Fatalf("readIVF frame = %q", got)
|
||||
}
|
||||
|
||||
enc = &ffmpegEncoder{stderr: &bytes.Buffer{}, frames: make(chan []byte, 2)}
|
||||
enc.readRawH264(bytes.NewBufferString("h264"))
|
||||
if got := <-enc.frames; !bytes.Equal(got, []byte("h264")) {
|
||||
t.Fatalf("readRawH264 frame = %q", got)
|
||||
}
|
||||
|
||||
dec := &ffmpegDecoder{stderr: &bytes.Buffer{}, frames: make(chan []byte, 2), frameSize: 4}
|
||||
dec.readRawFrames(bytes.NewBufferString("aaaabbbb"))
|
||||
if got := <-dec.frames; !bytes.Equal(got, []byte("aaaa")) {
|
||||
t.Fatalf("readRawFrames first = %q", got)
|
||||
}
|
||||
if got := <-dec.frames; !bytes.Equal(got, []byte("bbbb")) {
|
||||
t.Fatalf("readRawFrames second = %q", got)
|
||||
}
|
||||
|
||||
h264In := &bufferWriteCloser{}
|
||||
dec = &ffmpegDecoder{stdin: h264In, mimeType: webrtc.MimeTypeH264}
|
||||
if err := dec.PushSample([]byte("sample")); err != nil {
|
||||
t.Fatalf("PushSample(h264) error = %v", err)
|
||||
}
|
||||
if h264In.String() != "sample" {
|
||||
t.Fatalf("h264 stdin = %q", h264In.String())
|
||||
}
|
||||
|
||||
ivfIn := &bufferWriteCloser{}
|
||||
dec = &ffmpegDecoder{stdin: ivfIn, mimeType: webrtc.MimeTypeVP8}
|
||||
if err := dec.PushSample([]byte("vp8")); err != nil {
|
||||
t.Fatalf("PushSample(vp8) error = %v", err)
|
||||
}
|
||||
if ivfIn.Len() != 12+len("vp8") || dec.pts != 1 {
|
||||
t.Fatalf("ivf stdin len=%d pts=%d", ivfIn.Len(), dec.pts)
|
||||
}
|
||||
|
||||
dec = &ffmpegDecoder{frames: make(chan []byte, 1)}
|
||||
dec.frames <- []byte("ready")
|
||||
if got, err := dec.PopFrame(); err != nil || !bytes.Equal(got, []byte("ready")) {
|
||||
t.Fatalf("PopFrame() = %q, %v", got, err)
|
||||
}
|
||||
}
|
||||
|
||||
226
internal/transport/videochannel/transport_unit_test.go
Normal file
226
internal/transport/videochannel/transport_unit_test.go
Normal file
@@ -0,0 +1,226 @@
|
||||
package videochannel
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"hash/crc32"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/openlibrecommunity/olcrtc/internal/carrier"
|
||||
"github.com/openlibrecommunity/olcrtc/internal/transport"
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
type fakeVideoSession struct {
|
||||
stream *fakeVideoStream
|
||||
err error
|
||||
}
|
||||
|
||||
func (s *fakeVideoSession) Capabilities() carrier.Capabilities {
|
||||
return carrier.Capabilities{VideoTrack: true}
|
||||
}
|
||||
func (s *fakeVideoSession) OpenVideoTrack() (carrier.VideoTrack, error) {
|
||||
if s.err != nil {
|
||||
return nil, s.err
|
||||
}
|
||||
return s.stream, nil
|
||||
}
|
||||
|
||||
type fakeVideoStream struct {
|
||||
closeErr error
|
||||
canSend bool
|
||||
trackAdded bool
|
||||
trackCB func(*webrtc.TrackRemote, *webrtc.RTPReceiver)
|
||||
reconnect func()
|
||||
should func() bool
|
||||
ended func(string)
|
||||
watched bool
|
||||
closed bool
|
||||
}
|
||||
|
||||
func (s *fakeVideoStream) Connect(context.Context) error { return nil }
|
||||
func (s *fakeVideoStream) Close() error { s.closed = true; return s.closeErr }
|
||||
func (s *fakeVideoStream) SetReconnectCallback(cb func()) { s.reconnect = cb }
|
||||
func (s *fakeVideoStream) SetShouldReconnect(fn func() bool) { s.should = fn }
|
||||
func (s *fakeVideoStream) SetEndedCallback(cb func(string)) { s.ended = cb }
|
||||
func (s *fakeVideoStream) WatchConnection(context.Context) { s.watched = true }
|
||||
func (s *fakeVideoStream) CanSend() bool { return s.canSend }
|
||||
func (s *fakeVideoStream) AddTrack(webrtc.TrackLocal) error { s.trackAdded = true; return nil }
|
||||
func (s *fakeVideoStream) SetTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) {
|
||||
s.trackCB = cb
|
||||
}
|
||||
|
||||
type nonVideoSession struct{}
|
||||
|
||||
func (s *nonVideoSession) Capabilities() carrier.Capabilities { return carrier.Capabilities{} }
|
||||
|
||||
func TestNewCallbacksFeaturesAndClose(t *testing.T) {
|
||||
stream := &fakeVideoStream{canSend: true}
|
||||
name := "videochannel-unit-new"
|
||||
carrier.Register(name, func(context.Context, carrier.Config) (carrier.Session, error) {
|
||||
return &fakeVideoSession{stream: stream}, nil
|
||||
})
|
||||
|
||||
trIface, err := New(context.Background(), transport.Config{
|
||||
Carrier: name,
|
||||
VideoWidth: 320,
|
||||
VideoHeight: 240,
|
||||
VideoFPS: 30,
|
||||
VideoBitrate: "1M",
|
||||
VideoCodec: "qrcode",
|
||||
VideoTileModule: -1,
|
||||
VideoTileRS: -1,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("New() error = %v", err)
|
||||
}
|
||||
tr := trIface.(*streamTransport)
|
||||
if !stream.trackAdded || stream.trackCB == nil {
|
||||
t.Fatal("New() did not attach track and handler")
|
||||
}
|
||||
tr.SetReconnectCallback(func() {})
|
||||
tr.SetShouldReconnect(func() bool { return true })
|
||||
tr.SetEndedCallback(func(string) {})
|
||||
tr.WatchConnection(context.Background())
|
||||
if stream.reconnect == nil || stream.should == nil || stream.ended == nil || !stream.watched {
|
||||
t.Fatal("callbacks/watch were not forwarded")
|
||||
}
|
||||
if !tr.CanSend() {
|
||||
t.Fatal("CanSend() = false, want true")
|
||||
}
|
||||
if features := tr.Features(); !features.Reliable || !features.Ordered || !features.MessageOriented || features.MaxPayloadSize == 0 {
|
||||
t.Fatalf("Features() = %+v", features)
|
||||
}
|
||||
if tr.videoQRSize != defaultFragmentSize || tr.videoTileModule != 4 || tr.videoTileRS != 20 {
|
||||
t.Fatalf("defaults qr=%d tileModule=%d tileRS=%d", tr.videoQRSize, tr.videoTileModule, tr.videoTileRS)
|
||||
}
|
||||
if err := tr.Close(); err != nil {
|
||||
t.Fatalf("Close() error = %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewErrorPaths(t *testing.T) {
|
||||
carrier.Register("videochannel-create-fails", func(context.Context, carrier.Config) (carrier.Session, error) {
|
||||
return nil, errors.New("boom")
|
||||
})
|
||||
if _, err := New(context.Background(), transport.Config{Carrier: "videochannel-create-fails"}); err == nil || err.Error() != "create provider transport: boom" {
|
||||
t.Fatalf("New() error = %v", err)
|
||||
}
|
||||
|
||||
carrier.Register("videochannel-no-video", func(context.Context, carrier.Config) (carrier.Session, error) {
|
||||
return &nonVideoSession{}, nil
|
||||
})
|
||||
if _, err := New(context.Background(), transport.Config{Carrier: "videochannel-no-video"}); !errors.Is(err, ErrVideoTrackUnsupported) {
|
||||
t.Fatalf("New() error = %v, want %v", err, ErrVideoTrackUnsupported)
|
||||
}
|
||||
|
||||
carrier.Register("videochannel-open-fails", func(context.Context, carrier.Config) (carrier.Session, error) {
|
||||
return &fakeVideoSession{err: errors.New("open boom")}, nil
|
||||
})
|
||||
if _, err := New(context.Background(), transport.Config{Carrier: "videochannel-open-fails"}); err == nil || err.Error() != "open video track: open boom" {
|
||||
t.Fatalf("New() error = %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestSendAckAndClosePaths(t *testing.T) {
|
||||
tr := &streamTransport{
|
||||
stream: &fakeVideoStream{canSend: true},
|
||||
outbound: make(chan []byte, 8),
|
||||
outboundAck: make(chan []byte, 8),
|
||||
closeCh: make(chan struct{}),
|
||||
writerDone: make(chan struct{}),
|
||||
ackWaiters: make(map[uint32]chan uint32),
|
||||
videoQRSize: 4,
|
||||
}
|
||||
|
||||
done := make(chan error, 1)
|
||||
payload := []byte("payload")
|
||||
go func() { done <- tr.Send(payload) }()
|
||||
|
||||
select {
|
||||
case frame := <-tr.outbound:
|
||||
decoded, err := decodeTransportFrame(frame)
|
||||
if err != nil {
|
||||
t.Fatalf("decodeTransportFrame() error = %v", err)
|
||||
}
|
||||
tr.resolveAck(decoded.seq, crc32.ChecksumIEEE(payload))
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("Send() did not enqueue frame")
|
||||
}
|
||||
|
||||
if err := <-done; err != nil {
|
||||
t.Fatalf("Send() error = %v", err)
|
||||
}
|
||||
if err := tr.Close(); err != nil {
|
||||
t.Fatalf("Close() error = %v", err)
|
||||
}
|
||||
if err := tr.Send([]byte("closed")); !errors.Is(err, ErrTransportClosed) {
|
||||
t.Fatalf("Send(closed) error = %v, want %v", err, ErrTransportClosed)
|
||||
}
|
||||
}
|
||||
|
||||
func TestOutboundPriorityRenderAndClosedEnqueue(t *testing.T) {
|
||||
tr := &streamTransport{
|
||||
stream: &fakeVideoStream{canSend: true},
|
||||
outbound: make(chan []byte, 2),
|
||||
outboundAck: make(chan []byte, 2),
|
||||
closeCh: make(chan struct{}),
|
||||
writerDone: make(chan struct{}),
|
||||
videoW: 16,
|
||||
videoH: 16,
|
||||
videoQRRecovery: "highest",
|
||||
videoCodec: "qrcode",
|
||||
videoTileModule: 4,
|
||||
videoTileRS: 20,
|
||||
}
|
||||
|
||||
if err := tr.enqueueFrame([]byte("data"), false); err != nil {
|
||||
t.Fatalf("enqueueFrame(data) error = %v", err)
|
||||
}
|
||||
if err := tr.enqueueFrame([]byte("ack"), true); err != nil {
|
||||
t.Fatalf("enqueueFrame(ack) error = %v", err)
|
||||
}
|
||||
if got, ok := tr.nextOutboundFrame(); !ok || string(got) != "ack" {
|
||||
t.Fatalf("first nextOutboundFrame() = %q/%v, want ack/true", got, ok)
|
||||
}
|
||||
if got, ok := tr.nextOutboundFrame(); !ok || string(got) != "data" {
|
||||
t.Fatalf("second nextOutboundFrame() = %q/%v, want data/true", got, ok)
|
||||
}
|
||||
if got, ok := tr.nextOutboundFrame(); !ok || got != nil {
|
||||
t.Fatalf("idle nextOutboundFrame() = %q/%v, want nil/true", got, ok)
|
||||
}
|
||||
|
||||
idle, err := tr.renderFrame(nil)
|
||||
if err != nil {
|
||||
t.Fatalf("renderFrame(nil) error = %v", err)
|
||||
}
|
||||
if len(idle) != tr.videoW*tr.videoH {
|
||||
t.Fatalf("idle frame len = %d, want %d", len(idle), tr.videoW*tr.videoH)
|
||||
}
|
||||
if features := tr.Features(); features.MaxPayloadSize != defaultMaxPayloadSize {
|
||||
t.Fatalf("Features() = %+v", features)
|
||||
}
|
||||
|
||||
tr.videoQRSize = defaultMaxPayloadSize
|
||||
if features := tr.Features(); features.MaxPayloadSize <= defaultMaxPayloadSize {
|
||||
t.Fatalf("Features(large qr) = %+v", features)
|
||||
}
|
||||
|
||||
tr.closed.Store(true)
|
||||
if err := tr.enqueueFrame([]byte("closed"), false); !errors.Is(err, ErrTransportClosed) {
|
||||
t.Fatalf("enqueueFrame(closed) error = %v, want %v", err, ErrTransportClosed)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNextOutboundFrameStopsWhenClosed(t *testing.T) {
|
||||
tr := &streamTransport{
|
||||
outbound: make(chan []byte, 1),
|
||||
outboundAck: make(chan []byte, 1),
|
||||
closeCh: make(chan struct{}),
|
||||
}
|
||||
close(tr.closeCh)
|
||||
if got, ok := tr.nextOutboundFrame(); ok || got != nil {
|
||||
t.Fatalf("nextOutboundFrame(closed) = %q/%v, want nil/false", got, ok)
|
||||
}
|
||||
}
|
||||
272
internal/transport/vp8channel/transport_unit_test.go
Normal file
272
internal/transport/vp8channel/transport_unit_test.go
Normal file
@@ -0,0 +1,272 @@
|
||||
package vp8channel
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"github.com/openlibrecommunity/olcrtc/internal/carrier"
|
||||
"github.com/openlibrecommunity/olcrtc/internal/transport"
|
||||
"github.com/pion/rtp"
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
type fakeVideoSession struct {
|
||||
stream *fakeVideoStream
|
||||
err error
|
||||
}
|
||||
|
||||
func (s *fakeVideoSession) Capabilities() carrier.Capabilities {
|
||||
return carrier.Capabilities{VideoTrack: true}
|
||||
}
|
||||
func (s *fakeVideoSession) OpenVideoTrack() (carrier.VideoTrack, error) {
|
||||
if s.err != nil {
|
||||
return nil, s.err
|
||||
}
|
||||
return s.stream, nil
|
||||
}
|
||||
|
||||
type fakeVideoStream struct {
|
||||
connectErr error
|
||||
closeErr error
|
||||
canSend bool
|
||||
trackAdded bool
|
||||
trackCB func(*webrtc.TrackRemote, *webrtc.RTPReceiver)
|
||||
reconnect func()
|
||||
should func() bool
|
||||
ended func(string)
|
||||
watched bool
|
||||
closed bool
|
||||
}
|
||||
|
||||
func (s *fakeVideoStream) Connect(context.Context) error { return s.connectErr }
|
||||
func (s *fakeVideoStream) Close() error {
|
||||
s.closed = true
|
||||
return s.closeErr
|
||||
}
|
||||
func (s *fakeVideoStream) SetReconnectCallback(cb func()) { s.reconnect = cb }
|
||||
func (s *fakeVideoStream) SetShouldReconnect(fn func() bool) { s.should = fn }
|
||||
func (s *fakeVideoStream) SetEndedCallback(cb func(string)) { s.ended = cb }
|
||||
func (s *fakeVideoStream) WatchConnection(context.Context) { s.watched = true }
|
||||
func (s *fakeVideoStream) CanSend() bool { return s.canSend }
|
||||
func (s *fakeVideoStream) AddTrack(webrtc.TrackLocal) error { s.trackAdded = true; return nil }
|
||||
func (s *fakeVideoStream) SetTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) {
|
||||
s.trackCB = cb
|
||||
}
|
||||
|
||||
type nonVideoSession struct{}
|
||||
|
||||
func (s *nonVideoSession) Capabilities() carrier.Capabilities { return carrier.Capabilities{} }
|
||||
|
||||
func TestNewConnectSendCallbacksFeaturesAndClose(t *testing.T) {
|
||||
stream := &fakeVideoStream{canSend: true}
|
||||
name := "vp8channel-unit-new"
|
||||
carrier.Register(name, func(context.Context, carrier.Config) (carrier.Session, error) {
|
||||
return &fakeVideoSession{stream: stream}, nil
|
||||
})
|
||||
|
||||
trIface, err := New(context.Background(), transport.Config{
|
||||
Carrier: name,
|
||||
ClientID: "client",
|
||||
VP8FPS: 30,
|
||||
VP8BatchSize: 1,
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("New() error = %v", err)
|
||||
}
|
||||
tr := trIface.(*streamTransport)
|
||||
if !stream.trackAdded || stream.trackCB == nil {
|
||||
t.Fatal("New() did not attach track and handler")
|
||||
}
|
||||
if err := tr.Connect(context.Background()); err != nil {
|
||||
t.Fatalf("Connect() error = %v", err)
|
||||
}
|
||||
if tr.kcp == nil || !tr.writerUp.Load() {
|
||||
t.Fatal("Connect() did not initialize kcp/writer")
|
||||
}
|
||||
tr.SetReconnectCallback(func() {})
|
||||
tr.SetShouldReconnect(func() bool { return true })
|
||||
tr.SetEndedCallback(func(string) {})
|
||||
tr.WatchConnection(context.Background())
|
||||
if stream.reconnect == nil || stream.should == nil || stream.ended == nil || !stream.watched {
|
||||
t.Fatal("callbacks/watch were not forwarded")
|
||||
}
|
||||
if !tr.CanSend() {
|
||||
t.Fatal("CanSend() = false, want true")
|
||||
}
|
||||
if features := tr.Features(); !features.Reliable || !features.Ordered || !features.MessageOriented || features.MaxPayloadSize == 0 {
|
||||
t.Fatalf("Features() = %+v", features)
|
||||
}
|
||||
if err := tr.Send([]byte("payload")); err != nil {
|
||||
t.Fatalf("Send() error = %v", err)
|
||||
}
|
||||
tr.drainOutbound()
|
||||
if err := tr.Close(); err != nil {
|
||||
t.Fatalf("Close() error = %v", err)
|
||||
}
|
||||
if err := tr.Send([]byte("closed")); !errors.Is(err, ErrTransportClosed) {
|
||||
t.Fatalf("Send(closed) error = %v, want %v", err, ErrTransportClosed)
|
||||
}
|
||||
}
|
||||
|
||||
func TestNewErrorPaths(t *testing.T) {
|
||||
carrier.Register("vp8channel-create-fails", func(context.Context, carrier.Config) (carrier.Session, error) {
|
||||
return nil, errors.New("boom")
|
||||
})
|
||||
if _, err := New(context.Background(), transport.Config{Carrier: "vp8channel-create-fails"}); err == nil || err.Error() != "create provider transport: boom" {
|
||||
t.Fatalf("New() error = %v", err)
|
||||
}
|
||||
|
||||
carrier.Register("vp8channel-no-video", func(context.Context, carrier.Config) (carrier.Session, error) {
|
||||
return &nonVideoSession{}, nil
|
||||
})
|
||||
if _, err := New(context.Background(), transport.Config{Carrier: "vp8channel-no-video"}); !errors.Is(err, ErrVideoTrackUnsupported) {
|
||||
t.Fatalf("New() error = %v, want %v", err, ErrVideoTrackUnsupported)
|
||||
}
|
||||
|
||||
carrier.Register("vp8channel-open-fails", func(context.Context, carrier.Config) (carrier.Session, error) {
|
||||
return &fakeVideoSession{err: errors.New("open boom")}, nil
|
||||
})
|
||||
if _, err := New(context.Background(), transport.Config{Carrier: "vp8channel-open-fails"}); err == nil || err.Error() != "open video track: open boom" {
|
||||
t.Fatalf("New() error = %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEpochHeaderTokenAndOutboundCapacity(t *testing.T) {
|
||||
tr := &streamTransport{
|
||||
stream: &fakeVideoStream{canSend: true},
|
||||
outbound: make(chan []byte, 10),
|
||||
closeCh: make(chan struct{}),
|
||||
writerDone: make(chan struct{}),
|
||||
bindingToken: bindingToken("client"),
|
||||
localEpoch: 0x01020304,
|
||||
}
|
||||
|
||||
hdr := tr.epochHeader()
|
||||
if hdr[0] != kcpFrameMagic ||
|
||||
binary.BigEndian.Uint32(hdr[tokenOff:epochOff]) != tr.bindingToken ||
|
||||
binary.BigEndian.Uint32(hdr[epochOff:]) != tr.localEpoch {
|
||||
t.Fatalf("epochHeader() = %x", hdr)
|
||||
}
|
||||
if bindingToken("") == 0 || randomEpoch() == 0 {
|
||||
t.Fatal("bindingToken/randomEpoch returned zero")
|
||||
}
|
||||
|
||||
for len(tr.outbound) < cap(tr.outbound)*canSendHighWatermark/100 {
|
||||
tr.outbound <- []byte("queued")
|
||||
}
|
||||
if tr.CanSend() {
|
||||
t.Fatal("CanSend() = true at high watermark")
|
||||
}
|
||||
tr.drainOutbound()
|
||||
if !tr.CanSend() {
|
||||
t.Fatal("CanSend() = false after drain")
|
||||
}
|
||||
tr.closed.Store(true)
|
||||
if tr.CanSend() {
|
||||
t.Fatal("CanSend() = true after closed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestVP8FrameStateAssemblesAndRejectsCorruptFrames(t *testing.T) {
|
||||
frame := append([]byte{kcpFrameMagic}, bytes.Repeat([]byte{0x01}, epochHdrLen)...)
|
||||
var state vp8FrameState
|
||||
|
||||
got := state.processRTPPacket(&rtp.Packet{
|
||||
Header: rtp.Header{SequenceNumber: 10, Marker: true},
|
||||
Payload: append([]byte{0x10}, frame...),
|
||||
})
|
||||
if !bytes.Equal(got, frame) {
|
||||
t.Fatalf("single-packet frame = %x, want %x", got, frame)
|
||||
}
|
||||
|
||||
state = vp8FrameState{}
|
||||
if got := state.processRTPPacket(&rtp.Packet{
|
||||
Header: rtp.Header{SequenceNumber: 20},
|
||||
Payload: append([]byte{0x10}, frame[:4]...),
|
||||
}); got != nil {
|
||||
t.Fatalf("partial frame = %x, want nil", got)
|
||||
}
|
||||
got = state.processRTPPacket(&rtp.Packet{
|
||||
Header: rtp.Header{SequenceNumber: 21, Marker: true},
|
||||
Payload: append([]byte{0x00}, frame[4:]...),
|
||||
})
|
||||
if !bytes.Equal(got, frame) {
|
||||
t.Fatalf("fragmented frame = %x, want %x", got, frame)
|
||||
}
|
||||
|
||||
state = vp8FrameState{}
|
||||
_ = state.processRTPPacket(&rtp.Packet{
|
||||
Header: rtp.Header{SequenceNumber: 30},
|
||||
Payload: append([]byte{0x10}, frame[:4]...),
|
||||
})
|
||||
if got := state.processRTPPacket(&rtp.Packet{
|
||||
Header: rtp.Header{SequenceNumber: 32, Marker: true},
|
||||
Payload: append([]byte{0x00}, frame[4:]...),
|
||||
}); got != nil {
|
||||
t.Fatalf("frame after sequence gap = %x, want nil", got)
|
||||
}
|
||||
|
||||
state = vp8FrameState{}
|
||||
if got := state.processRTPPacket(&rtp.Packet{
|
||||
Header: rtp.Header{SequenceNumber: 40, Marker: true},
|
||||
Payload: []byte{},
|
||||
}); got != nil {
|
||||
t.Fatalf("bad vp8 payload = %x, want nil", got)
|
||||
}
|
||||
}
|
||||
|
||||
func TestHandleIncomingFrameEpochFilteringAndReconnect(t *testing.T) {
|
||||
called := 0
|
||||
tr := &streamTransport{
|
||||
stream: &fakeVideoStream{canSend: true},
|
||||
outbound: make(chan []byte, 16),
|
||||
closeCh: make(chan struct{}),
|
||||
writerDone: make(chan struct{}),
|
||||
bindingToken: bindingToken("client"),
|
||||
localEpoch: 0x100,
|
||||
onData: func([]byte) { called++ },
|
||||
}
|
||||
defer func() {
|
||||
_ = tr.Close()
|
||||
}()
|
||||
|
||||
mkFrame := func(token, epoch uint32, payload []byte) []byte {
|
||||
frame := make([]byte, epochHdrLen+len(payload))
|
||||
frame[0] = kcpFrameMagic
|
||||
binary.BigEndian.PutUint32(frame[tokenOff:epochOff], token)
|
||||
binary.BigEndian.PutUint32(frame[epochOff:epochHdrLen], epoch)
|
||||
copy(frame[epochHdrLen:], payload)
|
||||
return frame
|
||||
}
|
||||
|
||||
tr.handleIncomingFrame(mkFrame(bindingToken("other"), 1, []byte("x")))
|
||||
tr.handleIncomingFrame(mkFrame(tr.bindingToken, tr.localEpoch, []byte("self")))
|
||||
tr.handleIncomingFrame(mkFrame(tr.bindingToken, 1, nil))
|
||||
if tr.hadPeer.Load() || called != 0 {
|
||||
t.Fatal("filtered frames changed peer state")
|
||||
}
|
||||
|
||||
tr.handleIncomingFrame(mkFrame(tr.bindingToken, 1, []byte("first")))
|
||||
if !tr.hadPeer.Load() || tr.peerEpoch.Load() != 1 {
|
||||
t.Fatalf("peer state after first frame: had=%v epoch=%d", tr.hadPeer.Load(), tr.peerEpoch.Load())
|
||||
}
|
||||
|
||||
reconnected := false
|
||||
tr.SetReconnectCallback(func() { reconnected = true })
|
||||
stream := tr.stream.(*fakeVideoStream)
|
||||
if stream.reconnect == nil {
|
||||
t.Fatal("SetReconnectCallback did not install stream callback")
|
||||
}
|
||||
stream.reconnect()
|
||||
if !reconnected || tr.kcp == nil {
|
||||
t.Fatalf("stream reconnect did not reset/callback: reconnected=%v kcp=%v", reconnected, tr.kcp)
|
||||
}
|
||||
reconnected = false
|
||||
tr.handleIncomingFrame(mkFrame(tr.bindingToken, 2, []byte("after-restart")))
|
||||
if !reconnected || tr.peerEpoch.Load() != 2 || tr.kcp == nil {
|
||||
t.Fatalf("epoch change did not reset/reconnect: reconnected=%v epoch=%d kcp=%v", reconnected, tr.peerEpoch.Load(), tr.kcp)
|
||||
}
|
||||
}
|
||||
@@ -52,14 +52,15 @@ const (
|
||||
|
||||
//nolint:gochecknoglobals // Mobile bindings expose a singleton runtime controlled by the embedding app.
|
||||
var (
|
||||
mu sync.Mutex
|
||||
defaults mobileConfig
|
||||
defaultsSet sync.Once
|
||||
registerSet sync.Once
|
||||
cancel context.CancelFunc
|
||||
done chan struct{}
|
||||
ready chan struct{}
|
||||
errRun error
|
||||
mu sync.Mutex
|
||||
defaults mobileConfig
|
||||
defaultsSet sync.Once
|
||||
registerSet sync.Once
|
||||
runClientWithReady = client.RunWithReady
|
||||
cancel context.CancelFunc
|
||||
done chan struct{}
|
||||
ready chan struct{}
|
||||
errRun error
|
||||
)
|
||||
|
||||
type mobileConfig struct {
|
||||
@@ -200,7 +201,7 @@ func Check(
|
||||
startedAt := time.Now()
|
||||
|
||||
go func() {
|
||||
doneCh <- client.RunWithReady(
|
||||
doneCh <- runClientWithReady(
|
||||
ctx,
|
||||
defaultLink,
|
||||
transportName,
|
||||
@@ -288,7 +289,7 @@ func startWithConfig(
|
||||
go func() {
|
||||
defer cancelFunc()
|
||||
|
||||
err := client.RunWithReady(
|
||||
err := runClientWithReady(
|
||||
ctx,
|
||||
cfg.link,
|
||||
cfg.transport,
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
package mobile
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"log"
|
||||
"strings"
|
||||
@@ -39,6 +40,7 @@ func resetMobileGlobals(t *testing.T) {
|
||||
done = nil
|
||||
ready = nil
|
||||
errRun = nil
|
||||
runClientWithReady = clientRunWithReady
|
||||
defaults = mobileConfig{}
|
||||
defaultsSet = sync.Once{}
|
||||
mu.Unlock()
|
||||
@@ -46,6 +48,8 @@ func resetMobileGlobals(t *testing.T) {
|
||||
logger.SetVerbose(false)
|
||||
}
|
||||
|
||||
var clientRunWithReady = runClientWithReady
|
||||
|
||||
func TestProtectorAndLogging(t *testing.T) {
|
||||
resetMobileGlobals(t)
|
||||
p := &testProtector{}
|
||||
@@ -151,6 +155,192 @@ func TestStartValidation(t *testing.T) {
|
||||
resetMobileGlobals(t)
|
||||
}
|
||||
|
||||
func TestStartWithInjectedRunnerLifecycle(t *testing.T) {
|
||||
resetMobileGlobals(t)
|
||||
t.Cleanup(func() {
|
||||
resetMobileGlobals(t)
|
||||
})
|
||||
|
||||
runClientWithReady = func(
|
||||
ctx context.Context,
|
||||
linkName, transportName, carrierName, roomURL, keyHex, clientID string,
|
||||
localAddr string,
|
||||
dnsServer, socksUser, socksPass string,
|
||||
onReady func(),
|
||||
videoWidth int,
|
||||
videoHeight int,
|
||||
videoFPS int,
|
||||
videoBitrate string,
|
||||
videoHW string,
|
||||
videoQRSize int,
|
||||
videoQRRecovery string,
|
||||
videoCodec string,
|
||||
videoTileModule int,
|
||||
videoTileRS int,
|
||||
vp8FPS int,
|
||||
vp8BatchSize int,
|
||||
) error {
|
||||
if linkName != defaultLink || transportName != dataTransport || carrierName != carrierJazz ||
|
||||
roomURL != "any" || clientID != "client" || localAddr != "127.0.0.1:1080" ||
|
||||
dnsServer != defaultDNSServer || vp8FPS != 60 || vp8BatchSize != 8 {
|
||||
t.Fatalf("RunWithReady args mismatch: link=%q transport=%q carrier=%q room=%q client=%q local=%q dns=%q vp8=%d/%d",
|
||||
linkName, transportName, carrierName, roomURL, clientID, localAddr, dnsServer, vp8FPS, vp8BatchSize)
|
||||
}
|
||||
onReady()
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
if err := StartWithTransport(carrierJazz, "dc", "", "client", "key", 1080, "", ""); err != nil {
|
||||
t.Fatalf("StartWithTransport() error = %v", err)
|
||||
}
|
||||
if !IsRunning() {
|
||||
t.Fatal("IsRunning() = false, want true")
|
||||
}
|
||||
if err := WaitReady(100); err != nil {
|
||||
t.Fatalf("WaitReady() error = %v", err)
|
||||
}
|
||||
Stop()
|
||||
if IsRunning() {
|
||||
t.Fatal("IsRunning() = true after Stop")
|
||||
}
|
||||
}
|
||||
|
||||
func TestStartUsesDefaultsAndCheckWithInjectedRunner(t *testing.T) {
|
||||
resetMobileGlobals(t)
|
||||
t.Cleanup(func() {
|
||||
resetMobileGlobals(t)
|
||||
})
|
||||
|
||||
runClientWithReady = func(
|
||||
ctx context.Context,
|
||||
linkName, transportName, carrierName, roomURL, keyHex, clientID string,
|
||||
localAddr string,
|
||||
dnsServer, socksUser, socksPass string,
|
||||
onReady func(),
|
||||
videoWidth int,
|
||||
videoHeight int,
|
||||
videoFPS int,
|
||||
videoBitrate string,
|
||||
videoHW string,
|
||||
videoQRSize int,
|
||||
videoQRRecovery string,
|
||||
videoCodec string,
|
||||
videoTileModule int,
|
||||
videoTileRS int,
|
||||
vp8FPS int,
|
||||
vp8BatchSize int,
|
||||
) error {
|
||||
if transportName != defaultTransport || roomURL != "https://telemost.yandex.ru/j/room" ||
|
||||
localAddr != "127.0.0.1:1081" || socksUser != "u" || socksPass != "p" {
|
||||
t.Fatalf("Start args mismatch: transport=%q room=%q local=%q user/pass=%q/%q",
|
||||
transportName, roomURL, localAddr, socksUser, socksPass)
|
||||
}
|
||||
onReady()
|
||||
<-ctx.Done()
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
if err := Start("telemost", "room", "client", "key", 1081, "u", "p"); err != nil {
|
||||
t.Fatalf("Start() error = %v", err)
|
||||
}
|
||||
if err := WaitReady(100); err != nil {
|
||||
t.Fatalf("WaitReady() error = %v", err)
|
||||
}
|
||||
Stop()
|
||||
|
||||
runClientWithReady = func(
|
||||
ctx context.Context,
|
||||
linkName, transportName, carrierName, roomURL, keyHex, clientID string,
|
||||
localAddr string,
|
||||
dnsServer, socksUser, socksPass string,
|
||||
onReady func(),
|
||||
videoWidth int,
|
||||
videoHeight int,
|
||||
videoFPS int,
|
||||
videoBitrate string,
|
||||
videoHW string,
|
||||
videoQRSize int,
|
||||
videoQRRecovery string,
|
||||
videoCodec string,
|
||||
videoTileModule int,
|
||||
videoTileRS int,
|
||||
vp8FPS int,
|
||||
vp8BatchSize int,
|
||||
) error {
|
||||
if transportName != dataTransport || vp8FPS != 1 || vp8BatchSize != 64 {
|
||||
t.Fatalf("Check args mismatch: transport=%q vp8=%d/%d", transportName, vp8FPS, vp8BatchSize)
|
||||
}
|
||||
onReady()
|
||||
<-ctx.Done()
|
||||
return nil
|
||||
}
|
||||
elapsed, err := Check("jazz", "dc", "", "client", "key", 1082, 100, -1, 999)
|
||||
if err != nil {
|
||||
t.Fatalf("Check() error = %v", err)
|
||||
}
|
||||
if elapsed < 0 {
|
||||
t.Fatalf("Check() elapsed = %d", elapsed)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCheckTimeoutAndRunError(t *testing.T) {
|
||||
resetMobileGlobals(t)
|
||||
t.Cleanup(func() {
|
||||
resetMobileGlobals(t)
|
||||
})
|
||||
|
||||
runClientWithReady = func(
|
||||
ctx context.Context,
|
||||
linkName, transportName, carrierName, roomURL, keyHex, clientID string,
|
||||
localAddr string,
|
||||
dnsServer, socksUser, socksPass string,
|
||||
onReady func(),
|
||||
videoWidth int,
|
||||
videoHeight int,
|
||||
videoFPS int,
|
||||
videoBitrate string,
|
||||
videoHW string,
|
||||
videoQRSize int,
|
||||
videoQRRecovery string,
|
||||
videoCodec string,
|
||||
videoTileModule int,
|
||||
videoTileRS int,
|
||||
vp8FPS int,
|
||||
vp8BatchSize int,
|
||||
) error {
|
||||
<-ctx.Done()
|
||||
return nil
|
||||
}
|
||||
if _, err := Check("telemost", defaultTransport, "room", "client", "key", 1083, 1, 30, 1); !errors.Is(err, errStartTimedOut) {
|
||||
t.Fatalf("Check(timeout) error = %v, want %v", err, errStartTimedOut)
|
||||
}
|
||||
|
||||
want := errors.New("check failed")
|
||||
runClientWithReady = func(
|
||||
context.Context,
|
||||
string, string, string, string, string, string,
|
||||
string,
|
||||
string, string, string,
|
||||
func(),
|
||||
int, int, int,
|
||||
string,
|
||||
string,
|
||||
int,
|
||||
string,
|
||||
string,
|
||||
int,
|
||||
int,
|
||||
int,
|
||||
int,
|
||||
) error {
|
||||
return want
|
||||
}
|
||||
if _, err := Check("telemost", defaultTransport, "room", "client", "key", 1084, 100, 30, 1); !errors.Is(err, want) {
|
||||
t.Fatalf("Check(run error) = %v, want %v", err, want)
|
||||
}
|
||||
}
|
||||
|
||||
func TestWaitReadyStatesAndStop(t *testing.T) {
|
||||
resetMobileGlobals(t)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user