Files
olcrtc/internal/e2e/tunnel_test.go
2026-05-16 02:10:33 +03:00

1518 lines
40 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package e2e
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"errors"
"flag"
"fmt"
"io"
"net"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/openlibrecommunity/olcrtc/internal/app/session"
"github.com/openlibrecommunity/olcrtc/internal/auth"
authSaluteJazz "github.com/openlibrecommunity/olcrtc/internal/auth/salutejazz"
authWBStream "github.com/openlibrecommunity/olcrtc/internal/auth/wbstream"
"github.com/openlibrecommunity/olcrtc/internal/carrier"
"github.com/openlibrecommunity/olcrtc/internal/client"
"github.com/openlibrecommunity/olcrtc/internal/link"
"github.com/openlibrecommunity/olcrtc/internal/server"
"github.com/openlibrecommunity/olcrtc/internal/supervisor"
"github.com/pion/webrtc/v4"
)
const (
testKeyHex = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff"
transportData = "datachannel"
transportVideo = "videochannel"
transportSEI = "seichannel"
transportVP8 = "vp8channel"
linkDirect = "direct"
testRoom = "room"
localDNSServer = "127.0.0.1:53"
videoHWNone = "none"
testClientDeviceID = "client-1"
)
var (
errRealE2ENotReady = errors.New("real e2e client did not become ready")
errTunnelDidNotStop = errors.New("tunnel goroutine did not stop")
errRealE2EEchoMismatch = errors.New("real e2e echo payload mismatch")
errSocksUnexpectedReply = errors.New("unexpected SOCKS5 reply")
errSocksUnexpectedHello = errors.New("unexpected SOCKS5 greeting")
errPayloadMismatchOffset = errors.New("payload mismatch at offset")
errFailoverCarrier = errors.New("intentional failover carrier failure")
)
var (
realE2E = flag.Bool( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-e2e",
false,
"run real provider e2e matrix against external WebRTC services",
)
realE2ECarriers = flag.String( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-carriers",
"jitsi,telemost,wbstream",
"comma-separated carriers for real e2e",
)
realE2ETransports = flag.String( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-transports",
"datachannel,videochannel,seichannel,vp8channel",
"comma-separated transports for real e2e",
)
realE2EJazzRoom = flag.String( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-jazz-room",
"",
"SaluteJazz room for real e2e, format roomID:password; autogenerated when empty",
)
realE2ETelemostRoom = flag.String( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-telemost-room",
"41514917109506",
"Telemost room URL or id for real e2e",
)
realE2EWBStreamRoom = flag.String( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-wbstream-room",
"019e23c2-a580-7550-b08a-7ac5342ca21f",
"WB Stream room id for real e2e; autogenerated when empty",
)
realE2EJitsiRoom = flag.String( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-jitsi-room",
"https://meet.cryptopro.ru/deadbeef",
"Jitsi Meet room URL for real e2e (format https://host/room or host/room)",
)
realE2ETimeout = flag.Duration( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-timeout",
90*time.Second,
"timeout per real e2e provider/transport case",
)
)
type realE2EExpectation int
const (
realE2EExpectFail realE2EExpectation = iota
realE2EExpectPass
// realE2EExpectUnstable marks a carrier×transport combo that is
// known to flap: it sometimes succeeds and sometimes fails for
// reasons outside our control (third-party server load, lossy SFU
// paths, etc.). The matrix runner records the outcome but does
// not fail the test either way. Use this sparingly — prefer
// ExpectPass / ExpectFail when the behaviour is deterministic.
realE2EExpectUnstable
)
type memorySession struct {
stream *memoryStream
}
func (s *memorySession) Capabilities() carrier.Capabilities {
return carrier.Capabilities{ByteStream: true, VideoTrack: true}
}
func (s *memorySession) OpenByteStream() (carrier.ByteStream, error) {
return s.stream, nil
}
func (s *memorySession) OpenVideoTrack() (carrier.VideoTrack, error) {
return s.stream, nil
}
type memoryRoom struct {
mu sync.Mutex
streams map[*memoryStream]struct{}
}
func (r *memoryRoom) connectedCount() int {
r.mu.Lock()
defer r.mu.Unlock()
count := 0
for stream := range r.streams {
if stream.isConnected() {
count++
}
}
return count
}
func (r *memoryRoom) waitConnected(t *testing.T, want int) {
t.Helper()
deadline := time.Now().Add(3 * time.Second)
for time.Now().Before(deadline) {
if r.connectedCount() >= want {
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("memory room connected streams = %d, want at least %d", r.connectedCount(), want)
}
func (r *memoryRoom) triggerReconnect() {
r.mu.Lock()
streams := make([]*memoryStream, 0, len(r.streams))
for stream := range r.streams {
streams = append(streams, stream)
}
r.mu.Unlock()
var wg sync.WaitGroup
for _, stream := range streams {
wg.Add(1)
go func() {
defer wg.Done()
stream.triggerReconnect()
}()
}
wg.Wait()
}
func (r *memoryRoom) triggerEnded(reason string) {
r.mu.Lock()
streams := make([]*memoryStream, 0, len(r.streams))
for stream := range r.streams {
streams = append(streams, stream)
}
r.mu.Unlock()
for _, stream := range streams {
stream.triggerEnded(reason)
}
}
type memoryStream struct {
room *memoryRoom
onData func([]byte)
mu sync.Mutex
connected bool
closed bool
reconnect func()
ended func(string)
track webrtc.TrackLocal
trackCB func(*webrtc.TrackRemote, *webrtc.RTPReceiver)
pending [][]byte
}
func (s *memoryStream) Connect(context.Context) error {
s.mu.Lock()
s.connected = true
pending := s.pending
s.pending = nil
onData := s.onData
s.mu.Unlock()
for _, payload := range pending {
if onData != nil {
onData(payload)
}
}
return nil
}
func (s *memoryStream) Send(data []byte) error {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
return io.ErrClosedPipe
}
s.mu.Unlock()
payload := append([]byte(nil), data...)
s.room.mu.Lock()
peers := make([]*memoryStream, 0, len(s.room.streams))
for peer := range s.room.streams {
if peer != s {
peers = append(peers, peer)
}
}
s.room.mu.Unlock()
for _, peer := range peers {
peer.deliver(payload)
}
return nil
}
func (s *memoryStream) deliver(data []byte) {
s.mu.Lock()
if !s.connected && !s.closed {
s.pending = append(s.pending, append([]byte(nil), data...))
s.mu.Unlock()
return
}
ready := !s.closed && s.onData != nil
onData := s.onData
s.mu.Unlock()
if ready {
onData(append([]byte(nil), data...))
}
}
func (s *memoryStream) Close() error {
s.mu.Lock()
s.closed = true
s.connected = false
s.mu.Unlock()
return nil
}
func (s *memoryStream) SetReconnectCallback(cb func()) {
s.mu.Lock()
s.reconnect = cb
s.mu.Unlock()
}
func (s *memoryStream) SetShouldReconnect(func() bool) {}
func (s *memoryStream) SetEndedCallback(cb func(string)) {
s.mu.Lock()
s.ended = cb
s.mu.Unlock()
}
func (s *memoryStream) WatchConnection(ctx context.Context) {
<-ctx.Done()
}
func (s *memoryStream) CanSend() bool {
return s.isConnected()
}
func (s *memoryStream) AddTrack(track webrtc.TrackLocal) error {
s.mu.Lock()
s.track = track
s.mu.Unlock()
return nil
}
func (s *memoryStream) SetTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) {
s.mu.Lock()
s.trackCB = cb
s.mu.Unlock()
}
func (s *memoryStream) isConnected() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.connected && !s.closed
}
func (s *memoryStream) triggerReconnect() {
s.mu.Lock()
reconnect := s.reconnect
ready := s.connected && !s.closed && reconnect != nil
s.mu.Unlock()
if ready {
reconnect()
}
}
func (s *memoryStream) triggerEnded(reason string) {
s.mu.Lock()
ended := s.ended
ready := s.connected && !s.closed && ended != nil
s.mu.Unlock()
if ready {
ended(reason)
}
}
func registerMemoryCarrier(t *testing.T) (string, *memoryRoom) {
t.Helper()
session.RegisterDefaults()
name := "e2e-memory-" + t.Name()
room := &memoryRoom{streams: make(map[*memoryStream]struct{})}
carrier.Register(name, func(_ context.Context, cfg carrier.Config) (carrier.Session, error) {
stream := &memoryStream{room: room, onData: cfg.OnData}
room.mu.Lock()
room.streams[stream] = struct{}{}
room.mu.Unlock()
return &memorySession{stream: stream}, nil
})
return name, room
}
func registerMemoryCarrierAs(t *testing.T, name string) {
t.Helper()
room := &memoryRoom{streams: make(map[*memoryStream]struct{})}
carrier.Register(name, func(_ context.Context, cfg carrier.Config) (carrier.Session, error) {
stream := &memoryStream{room: room, onData: cfg.OnData}
room.mu.Lock()
room.streams[stream] = struct{}{}
room.mu.Unlock()
return &memorySession{stream: stream}, nil
})
}
func registerFailingCarrier(t *testing.T) string {
t.Helper()
session.RegisterDefaults()
name := "e2e-fail-" + t.Name()
carrier.Register(name, func(context.Context, carrier.Config) (carrier.Session, error) {
return nil, errFailoverCarrier
})
return name
}
func builtInCarrierNames() []string {
return []string{"jazz", "telemost", "wbstream", "jitsi"} //nolint:goconst // test literal, repetition is intentional
}
func builtInTransportNames() []string {
return []string{transportData, transportVideo, transportSEI, transportVP8}
}
//nolint:cyclop // matrix of carrier×transport expectations is naturally branchy
func realE2ECaseExpectation(carrierName, transportName string) realE2EExpectation {
switch carrierName {
case "telemost":
switch transportName {
case transportVP8:
return realE2EExpectPass
case transportVideo:
return realE2EExpectPass
default:
return realE2EExpectFail
}
case "wbstream":
if transportName == transportData {
return realE2EExpectFail
}
return realE2EExpectPass
case "jazz":
if transportName == transportData {
return realE2EExpectPass
}
return realE2EExpectFail
case "jitsi":
// Jitsi colibri-ws bridge channel maps cleanly onto the
// datachannel transport (raw bytes broadcast through
// EndpointMessage). Video transports go through pion's
// PeerConnection negotiated via Jingle session-accept.
//
// seichannel is marked Unstable: SEI NAL data piggybacks on
// the H.264 video stream, and Jicofo's bandwidth allocator
// for self-hosted Jitsi instances (e.g. meet.cryptopro.ru)
// periodically suppresses the video upstream when there's
// no obvious viewer demand, which manifests as recurring
// "seichannel ack timeout" against an otherwise healthy
// PeerConnection. The transport works in steady state but
// is not deterministic enough to gate CI on; flag it but
// don't fail the suite when it flaps.
if transportName == transportSEI {
return realE2EExpectUnstable
}
return realE2EExpectPass
default:
return realE2EExpectPass
}
}
func realE2EExpectationLabel(expectation realE2EExpectation) string {
switch expectation {
case realE2EExpectPass:
return "SUCCESS"
case realE2EExpectFail:
return "EXPECTED FAIL"
case realE2EExpectUnstable:
return "UNSTABLE"
default:
return "UNKNOWN"
}
}
// logUnstableOutcome records the result of an Unstable matrix entry
// without failing the test. Unstable combos exist to keep the matrix
// honest about transports that flap against a particular carrier
// (e.g. seichannel against meet.cryptopro.ru's bandwidth allocator)
// while still surfacing whether the run happened to pass or fail.
func logUnstableOutcome(t *testing.T, label, carrierName, transportName string, err error) {
t.Helper()
if err == nil {
t.Logf("%s PASS %s/%s", label, carrierName, transportName)
return
}
t.Logf("%s FAIL %s/%s: %v", label, carrierName, transportName, err)
}
func TestRealE2ECaseExpectation(t *testing.T) {
tests := []struct {
name string
carrier string
transport string
want realE2EExpectation
}{
{
name: "jazz datachannel is expected to pass",
carrier: "jazz",
transport: transportData,
want: realE2EExpectPass,
},
{
name: "jazz videochannel is expected to fail",
carrier: "jazz",
transport: transportVideo,
want: realE2EExpectFail,
},
{
name: "jazz seichannel is expected to fail",
carrier: "jazz",
transport: transportSEI,
want: realE2EExpectFail,
},
{
name: "jazz vp8channel is expected to fail",
carrier: "jazz",
transport: transportVP8,
want: realE2EExpectFail,
},
{
name: "telemost datachannel is expected to fail",
carrier: "telemost",
transport: transportData,
want: realE2EExpectFail,
},
{
name: "telemost vp8channel is expected to pass",
carrier: "telemost",
transport: transportVP8,
want: realE2EExpectPass,
},
{
name: "wbstream datachannel is expected to fail",
carrier: "wbstream",
transport: transportData,
want: realE2EExpectFail,
},
{
name: "jitsi datachannel is expected to pass",
carrier: "jitsi",
transport: transportData,
want: realE2EExpectPass,
},
{
name: "jitsi vp8channel is expected to pass",
carrier: "jitsi",
transport: transportVP8,
want: realE2EExpectPass,
},
{
name: "jitsi videochannel is expected to pass",
carrier: "jitsi",
transport: transportVideo,
want: realE2EExpectPass,
},
{
name: "jitsi seichannel is unstable",
carrier: "jitsi",
transport: transportSEI,
want: realE2EExpectUnstable,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := realE2ECaseExpectation(tt.carrier, tt.transport); got != tt.want {
t.Fatalf("realE2ECaseExpectation(%q, %q) = %v, want %v", tt.carrier, tt.transport, got, tt.want)
}
})
}
}
func splitTestList(value string) []string {
parts := strings.Split(value, ",")
items := make([]string, 0, len(parts))
for _, part := range parts {
part = strings.TrimSpace(part)
if part != "" {
items = append(items, part)
}
}
return items
}
//nolint:cyclop // table-driven test naturally has many branches
func realRoomURL(ctx context.Context, t *testing.T, carrierName string) string {
t.Helper()
switch carrierName {
case "jazz":
if *realE2EJazzRoom != "" {
return *realE2EJazzRoom
}
room, err := authSaluteJazz.Provider{}.CreateRoom(ctx, auth.Config{Name: "olcrtc-e2e-room"})
if err != nil {
t.Skipf("skip jazz real e2e: create room failed: %v", err)
}
return room
case "telemost":
room := *realE2ETelemostRoom
if room != "" && !strings.HasPrefix(room, "http://") && !strings.HasPrefix(room, "https://") {
room = "https://telemost.yandex.ru/j/" + room
}
return room
case "wbstream":
if *realE2EWBStreamRoom != "" {
return *realE2EWBStreamRoom
}
room, err := authWBStream.Provider{}.CreateRoom(ctx, auth.Config{Name: "olcrtc-e2e-room"})
if err != nil {
t.Skipf("skip wbstream real e2e: create room failed: %v", err)
}
return room
case "jitsi":
// Jitsi has no notion of "creating" a room — names are conjured
// on first join. The default flag points at meet.cryptopro.ru
// (a CryptoPro-operated public Jitsi instance) with a fixed
// room slug so the server and client land in the same MUC.
_ = ctx
room := *realE2EJitsiRoom
if room == "" {
t.Skip("skip jitsi real e2e: empty -olcrtc.real-jitsi-room")
}
return room
default:
return ""
}
}
func requireRealRoom(ctx context.Context, t *testing.T, carrierName string) string {
t.Helper()
roomURL := realRoomURL(ctx, t, carrierName)
if roomURL == "" {
t.Fatalf("missing room for %s", carrierName)
}
return roomURL
}
func validSessionConfig(mode, carrierName, transportName string) session.Config {
return session.Config{
Mode: mode,
Link: linkDirect,
Transport: transportName,
Auth: carrierName,
RoomID: testRoom,
KeyHex: testKeyHex,
SOCKSHost: "127.0.0.1",
SOCKSPort: 1080,
DNSServer: localDNSServer,
VideoWidth: 1080,
VideoHeight: 1080,
VideoFPS: 30,
VideoBitrate: "1M",
VideoHW: videoHWNone,
VideoCodec: "tile",
VideoTileModule: 4,
VideoTileRS: 20,
VP8FPS: 60,
VP8BatchSize: 8,
SEIFPS: 30,
SEIBatchSize: 4,
SEIFragmentSize: 512,
SEIAckTimeoutMS: 1500,
}
}
func validLinkConfig(carrierName, transportName string) link.Config {
cfg := validSessionConfig("cnc", carrierName, transportName)
return link.Config{
Transport: cfg.Transport,
Carrier: cfg.Auth,
RoomURL: testRoom,
DeviceID: "e2e-link-test",
Name: "e2e-" + carrierName + "-" + transportName,
DNSServer: cfg.DNSServer,
VideoWidth: cfg.VideoWidth,
VideoHeight: cfg.VideoHeight,
VideoFPS: cfg.VideoFPS,
VideoBitrate: cfg.VideoBitrate,
VideoHW: cfg.VideoHW,
VideoCodec: cfg.VideoCodec,
VideoTileModule: cfg.VideoTileModule,
VideoTileRS: cfg.VideoTileRS,
VP8FPS: cfg.VP8FPS,
VP8BatchSize: cfg.VP8BatchSize,
SEIFPS: cfg.SEIFPS,
SEIBatchSize: cfg.SEIBatchSize,
SEIFragmentSize: cfg.SEIFragmentSize,
SEIAckTimeoutMS: cfg.SEIAckTimeoutMS,
}
}
func startEchoServer(t *testing.T) string {
t.Helper()
var lc net.ListenConfig
ln, err := lc.Listen(context.Background(), "tcp4", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen echo: %v", err)
}
t.Cleanup(func() { _ = ln.Close() })
go func() {
for {
conn, err := ln.Accept()
if err != nil {
return
}
go func() {
defer func() { _ = conn.Close() }()
_, _ = io.Copy(conn, conn)
}()
}
}()
return ln.Addr().String()
}
func freeLocalAddr(ctx context.Context, t *testing.T) string {
t.Helper()
var lc net.ListenConfig
ln, err := lc.Listen(ctx, "tcp4", "127.0.0.1:0")
if err != nil {
t.Fatalf("reserve local addr: %v", err)
}
addr := ln.Addr().String()
if err := ln.Close(); err != nil {
t.Fatalf("close reserved addr: %v", err)
}
return addr
}
func waitForReady(t *testing.T, ready <-chan struct{}) {
t.Helper()
select {
case <-ready:
case <-time.After(3 * time.Second):
t.Fatal("client did not become ready")
}
}
type tunnelRuntime struct {
socksAddr string
room *memoryRoom
cancel context.CancelFunc
serverErr chan error
clientErr chan error
stopWait time.Duration
}
func startTunnel(t *testing.T) *tunnelRuntime {
t.Helper()
carrierName, room := registerMemoryCarrier(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
socksAddr := freeLocalAddr(ctx, t)
serverErr := make(chan error, 1)
go func() {
serverErr <- server.Run(ctx, server.Config{
Link: linkDirect,
Transport: transportData,
Carrier: carrierName,
RoomURL: testRoom,
KeyHex: testKeyHex,
DNSServer: localDNSServer,
})
}()
room.waitConnected(t, 1)
ready := make(chan struct{})
clientErr := make(chan error, 1)
go func() {
clientErr <- client.RunWithReady(ctx, client.Config{
Link: linkDirect,
Transport: transportData,
Carrier: carrierName,
RoomURL: testRoom,
KeyHex: testKeyHex,
DeviceID: testClientDeviceID,
LocalAddr: socksAddr,
DNSServer: localDNSServer,
}, func() { close(ready) })
}()
waitForReady(t, ready)
return &tunnelRuntime{
socksAddr: socksAddr,
room: room,
cancel: cancel,
serverErr: serverErr,
clientErr: clientErr,
stopWait: 3 * time.Second,
}
}
func startRealTunnel(
ctx context.Context,
t *testing.T,
carrierName, transportName, roomURL, _, clientDeviceID string,
) (*tunnelRuntime, error) {
t.Helper()
session.RegisterDefaults()
socksAddr := freeLocalAddr(ctx, t)
runCtx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)
serverErr := make(chan error, 1)
go func() {
serverErr <- server.Run(runCtx, server.Config{
Link: linkDirect,
Transport: transportName,
Carrier: carrierName,
RoomURL: roomURL,
KeyHex: testKeyHex,
DNSServer: localDNSServer,
VideoWidth: 1080,
VideoHeight: 1080,
VideoFPS: 60,
VideoBitrate: "5000k",
VideoHW: videoHWNone,
VideoQRSize: 512,
VideoQRRecovery: "low",
VideoCodec: "qrcode",
VideoTileModule: 4,
VideoTileRS: 20,
VP8FPS: 60,
VP8BatchSize: 8,
SEIFPS: 30,
SEIBatchSize: 4,
SEIFragmentSize: 512,
SEIAckTimeoutMS: 1500,
})
}()
select {
case err := <-serverErr:
cancel()
return nil, fmt.Errorf("server exited before client start: %w", err)
case <-time.After(2 * time.Second):
case <-runCtx.Done():
cancel()
return nil, fmt.Errorf("server context ended before client start: %w", runCtx.Err())
}
ready := make(chan struct{})
clientErr := make(chan error, 1)
go func() {
clientErr <- client.RunWithReady(runCtx, client.Config{
Link: linkDirect,
Transport: transportName,
Carrier: carrierName,
RoomURL: roomURL,
KeyHex: testKeyHex,
DeviceID: clientDeviceID,
LocalAddr: socksAddr,
DNSServer: localDNSServer,
VideoWidth: 1080,
VideoHeight: 1080,
VideoFPS: 60,
VideoBitrate: "5000k",
VideoHW: videoHWNone,
VideoQRSize: 512,
VideoQRRecovery: "low",
VideoCodec: "qrcode",
VideoTileModule: 4,
VideoTileRS: 20,
VP8FPS: 60,
VP8BatchSize: 8,
SEIFPS: 30,
SEIBatchSize: 4,
SEIFragmentSize: 512,
SEIAckTimeoutMS: 1500,
}, func() { close(ready) })
}()
select {
case <-ready:
case err := <-clientErr:
cancel()
return nil, fmt.Errorf("client exited before ready: %w", err)
case err := <-serverErr:
cancel()
return nil, fmt.Errorf("server exited before client ready: %w", err)
case <-time.After(*realE2ETimeout):
cancel()
return nil, errRealE2ENotReady
case <-runCtx.Done():
cancel()
return nil, fmt.Errorf("real e2e context ended before ready: %w", runCtx.Err())
}
return &tunnelRuntime{
socksAddr: socksAddr,
cancel: cancel,
serverErr: serverErr,
clientErr: clientErr,
stopWait: 20 * time.Second,
}, nil
}
func (r *tunnelRuntime) stop(t *testing.T) {
t.Helper()
if err := r.stopErr(); err != nil {
t.Fatal(err)
}
}
func (r *tunnelRuntime) waitStopped(t *testing.T) {
t.Helper()
if err := r.waitStoppedErr(); err != nil {
t.Fatal(err)
}
}
func (r *tunnelRuntime) stopErr() error {
r.cancel()
return r.waitStoppedErr()
}
func (r *tunnelRuntime) waitStoppedErr() error {
stopWait := r.stopWait
if stopWait <= 0 {
stopWait = 3 * time.Second
}
for _, item := range []struct {
name string
ch <-chan error
}{
{name: "client", ch: r.clientErr},
{name: "server", ch: r.serverErr},
} {
select {
case err := <-item.ch:
if err != nil {
return fmt.Errorf("%s returned error: %w", item.name, err)
}
case <-time.After(stopWait):
return fmt.Errorf("%w: %s", errTunnelDidNotStop, item.name)
}
}
return nil
}
func connectViaSOCKS(t *testing.T, socksAddr, targetAddr string) net.Conn {
t.Helper()
dialer := net.Dialer{Timeout: 2 * time.Second}
conn, err := dialer.DialContext(context.Background(), "tcp4", socksAddr)
if err != nil {
t.Fatalf("dial socks: %v", err)
}
if _, err := conn.Write([]byte{5, 1, 0}); err != nil {
_ = conn.Close()
t.Fatalf("write socks greeting: %v", err)
}
greeting := make([]byte, 2)
if _, err := io.ReadFull(conn, greeting); err != nil {
_ = conn.Close()
t.Fatalf("read socks greeting: %v", err)
}
if !bytes.Equal(greeting, []byte{5, 0}) {
_ = conn.Close()
t.Fatalf("socks greeting = %v, want [5 0]", greeting)
}
host, portText, err := net.SplitHostPort(targetAddr)
if err != nil {
_ = conn.Close()
t.Fatalf("split target addr: %v", err)
}
port, err := strconv.Atoi(portText)
if err != nil {
_ = conn.Close()
t.Fatalf("parse target port: %v", err)
}
req := make([]byte, 0, 10)
req = append(req, 5, 1, 0, 1)
req = append(req, net.ParseIP(host).To4()...)
var portBuf [2]byte
binary.BigEndian.PutUint16(portBuf[:], uint16(port)) //nolint:gosec // SOCKS5 port is uint16 by definition
req = append(req, portBuf[:]...)
if _, err := conn.Write(req); err != nil {
_ = conn.Close()
t.Fatalf("write socks connect: %v", err)
}
reply := make([]byte, 10)
if _, err := io.ReadFull(conn, reply); err != nil {
_ = conn.Close()
t.Fatalf("read socks connect reply: %v", err)
}
if !bytes.Equal(reply, []byte{5, 0, 0, 1, 0, 0, 0, 0, 0, 0}) {
_ = conn.Close()
t.Fatalf("socks reply = %v, want success", reply)
}
return conn
}
func TestBuiltInProviderTransportMatrixValidates(t *testing.T) {
session.RegisterDefaults()
for _, mode := range []string{"srv", "cnc"} {
t.Run(mode, func(t *testing.T) {
for _, carrierName := range builtInCarrierNames() {
t.Run(carrierName, func(t *testing.T) {
for _, transportName := range builtInTransportNames() {
t.Run(transportName, func(t *testing.T) {
cfg := validSessionConfig(mode, carrierName, transportName)
if err := session.Validate(cfg); err != nil {
t.Fatalf("Validate() error = %v", err)
}
})
}
})
}
})
}
}
func TestDirectLinkCreatesAllProviderTransportCombinations(t *testing.T) {
session.RegisterDefaults()
for _, carrierName := range builtInCarrierNames() {
registerMemoryCarrierAs(t, carrierName)
}
for _, carrierName := range builtInCarrierNames() {
t.Run(carrierName, func(t *testing.T) {
for _, transportName := range builtInTransportNames() {
t.Run(transportName, func(t *testing.T) {
ln, err := link.New(context.Background(), linkDirect, validLinkConfig(carrierName, transportName))
if err != nil {
t.Fatalf("link.New() error = %v", err)
}
if err := ln.Close(); err != nil {
t.Fatalf("Close() error = %v", err)
}
})
}
})
}
}
func TestDirectLinkConnectsFastProviderTransportMatrix(t *testing.T) {
session.RegisterDefaults()
for _, carrierName := range builtInCarrierNames() {
registerMemoryCarrierAs(t, carrierName)
}
for _, carrierName := range builtInCarrierNames() {
t.Run(carrierName, func(t *testing.T) {
for _, transportName := range []string{transportData, transportSEI} {
t.Run(transportName, func(t *testing.T) {
ln, err := link.New(context.Background(), linkDirect, validLinkConfig(carrierName, transportName))
if err != nil {
t.Fatalf("link.New() error = %v", err)
}
if err := ln.Connect(context.Background()); err != nil {
t.Fatalf("Connect() error = %v", err)
}
if !ln.CanSend() {
t.Fatal("CanSend() = false, want true")
}
if err := ln.Close(); err != nil {
t.Fatalf("Close() error = %v", err)
}
})
}
})
}
}
//nolint:cyclop // table-driven test naturally has many branches
func TestRealProviderTransportMatrix(t *testing.T) {
if !*realE2E {
t.Skip("real provider e2e disabled; pass -olcrtc.real-e2e with provider room flags")
}
carriers := splitTestList(*realE2ECarriers)
transports := splitTestList(*realE2ETransports)
if len(carriers) == 0 {
t.Fatal("no real e2e carriers selected")
}
if len(transports) == 0 {
t.Fatal("no real e2e transports selected")
}
echoAddr := startEchoServer(t)
for _, carrierName := range carriers {
t.Run(carrierName, func(t *testing.T) {
roomCtx, cancelRoom := context.WithTimeout(context.Background(), *realE2ETimeout)
defer cancelRoom()
roomURL := requireRealRoom(roomCtx, t, carrierName)
var authFailed bool
for _, transportName := range transports {
t.Run(transportName, func(t *testing.T) {
if authFailed {
t.Skip("skipping: carrier auth failed on previous transport")
}
expectation := realE2ECaseExpectation(carrierName, transportName)
label := realE2EExpectationLabel(expectation)
err := runRealE2ECase(t, carrierName, transportName, roomURL, echoAddr)
if err != nil && errors.Is(err, carrier.ErrAuthFailed) {
authFailed = true
t.Skipf("skip %s real e2e: auth failed: %v", carrierName, err)
}
switch {
case err == nil && expectation == realE2EExpectPass:
t.Logf("%s %s/%s", label, carrierName, transportName)
case err == nil && expectation == realE2EExpectFail:
t.Fatalf("UNEXPECTED SUCCESS %s/%s", carrierName, transportName)
case err != nil && expectation == realE2EExpectPass:
t.Fatalf("EXPECTED SUCCESS %s/%s failed: %v", carrierName, transportName, err)
case err != nil && expectation == realE2EExpectFail:
t.Logf("%s %s/%s: %v", label, carrierName, transportName, err)
case expectation == realE2EExpectUnstable:
logUnstableOutcome(t, label, carrierName, transportName, err)
}
})
}
})
}
}
func runRealE2ECase(t *testing.T, carrierName, transportName, roomURL, echoAddr string) (err error) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), *realE2ETimeout)
defer cancel()
rt, err := startRealTunnel(ctx, t, carrierName, transportName, roomURL, testClientDeviceID, testClientDeviceID)
if err != nil {
return err
}
defer func() {
if stopErr := rt.stopErr(); err == nil && stopErr != nil {
err = stopErr
}
}()
conn, err := connectViaSOCKSWithin(rt.socksAddr, echoAddr, *realE2ETimeout)
if err != nil {
return err
}
defer func() { _ = conn.Close() }()
payload := []byte("olcrtc-real-e2e-" + carrierName + "-" + transportName + "\n")
if _, err := conn.Write(payload); err != nil {
return fmt.Errorf("write real e2e payload: %w", err)
}
if err := conn.SetReadDeadline(time.Now().Add(*realE2ETimeout)); err != nil {
return fmt.Errorf("set real e2e read deadline: %w", err)
}
line, err := bufio.NewReader(conn).ReadBytes('\n')
if err != nil {
return fmt.Errorf("read real e2e echo: %w", err)
}
if !bytes.Equal(line, payload) {
return fmt.Errorf("%w: got %q, want %q", errRealE2EEchoMismatch, line, payload)
}
return nil
}
func TestClientServerSOCKSTunnelOverMemoryDatachannel(t *testing.T) {
echoAddr := startEchoServer(t)
rt := startTunnel(t)
defer rt.stop(t)
conn := connectViaSOCKS(t, rt.socksAddr, echoAddr)
defer func() { _ = conn.Close() }()
payload := []byte("olcrtc-e2e-payload\n")
if _, err := conn.Write(payload); err != nil {
t.Fatalf("write tunneled payload: %v", err)
}
if err := conn.SetReadDeadline(time.Now().Add(3 * time.Second)); err != nil {
t.Fatalf("set read deadline: %v", err)
}
line, err := bufio.NewReader(conn).ReadBytes('\n')
if err != nil {
t.Fatalf("read tunneled echo: %v", err)
}
if !bytes.Equal(line, payload) {
t.Fatalf("echo = %q, want %q", line, payload)
}
}
func TestFrequentReconnectsStillAllowNewSOCKSConnections(t *testing.T) {
echoAddr := startEchoServer(t)
rt := startTunnel(t)
defer rt.stop(t)
for i := range 5 {
rt.room.triggerReconnect()
conn := eventuallyConnectViaSOCKS(t, rt.socksAddr, echoAddr)
payload := fmt.Appendf(nil, "after-reconnect-%d\n", i)
if _, err := conn.Write(payload); err != nil {
_ = conn.Close()
t.Fatalf("write after reconnect %d: %v", i, err)
}
if err := conn.SetReadDeadline(time.Now().Add(3 * time.Second)); err != nil {
_ = conn.Close()
t.Fatalf("set deadline after reconnect %d: %v", i, err)
}
line, err := bufio.NewReader(conn).ReadBytes('\n')
_ = conn.Close()
if err != nil {
t.Fatalf("read after reconnect %d: %v", i, err)
}
if !bytes.Equal(line, payload) {
t.Fatalf("echo after reconnect %d = %q, want %q", i, line, payload)
}
}
}
func TestSupervisorFailoverProfilesReachWorkingSOCKS(t *testing.T) {
echoAddr := startEchoServer(t)
failingCarrier := registerFailingCarrier(t)
memoryCarrier, room := registerMemoryCarrier(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
socksAddr := freeLocalAddr(ctx, t)
socksHost, socksPort := splitHostPort(t, socksAddr)
serverProfiles := []supervisor.Profile{
{Name: "failing-server", Config: failoverSessionConfig("srv", failingCarrier, "", 0)},
{Name: "memory-server", Config: failoverSessionConfig("srv", memoryCarrier, "", 0)},
}
clientProfiles := []supervisor.Profile{
{Name: "failing-client", Config: failoverSessionConfig("cnc", failingCarrier, socksHost, socksPort)},
{Name: "memory-client", Config: failoverSessionConfig("cnc", memoryCarrier, socksHost, socksPort)},
}
started := make(chan string, 8)
serverErr := make(chan error, 1)
go func() {
serverErr <- supervisor.Run(ctx, failoverE2EConfig(serverProfiles, started, "server"), session.Run)
}()
room.waitConnected(t, 1)
ready := make(chan struct{})
var readyOnce sync.Once
clientErr := make(chan error, 1)
go func() {
clientErr <- supervisor.Run(ctx, failoverE2EConfig(clientProfiles, started, "client"), func(ctx context.Context, cfg session.Config) error {
return client.RunWithReady(ctx, clientConfigFromSession(cfg, socksAddr), func() {
if cfg.Auth == memoryCarrier {
readyOnce.Do(func() { close(ready) })
}
})
})
}()
waitForReady(t, ready)
conn := eventuallyConnectViaSOCKS(t, socksAddr, echoAddr)
defer func() { _ = conn.Close() }()
payload := []byte("olcrtc-failover-e2e\n")
if _, err := conn.Write(payload); err != nil {
t.Fatalf("write failover payload: %v", err)
}
if err := conn.SetReadDeadline(time.Now().Add(3 * time.Second)); err != nil {
t.Fatalf("set failover read deadline: %v", err)
}
line, err := bufio.NewReader(conn).ReadBytes('\n')
if err != nil {
t.Fatalf("read failover echo: %v", err)
}
if !bytes.Equal(line, payload) {
t.Fatalf("failover echo = %q, want %q", line, payload)
}
requireStartedProfiles(t, started, []string{
"server:failing-server",
"server:memory-server",
"client:failing-client",
"client:memory-client",
})
cancel()
waitSupervisorStopped(t, "client", clientErr)
waitSupervisorStopped(t, "server", serverErr)
}
func failoverSessionConfig(mode, carrierName, socksHost string, socksPort int) session.Config {
cfg := session.Config{
Mode: mode,
Link: linkDirect,
Transport: transportData,
Auth: carrierName,
RoomID: testRoom,
KeyHex: testKeyHex,
DNSServer: localDNSServer,
}
if mode == "cnc" {
cfg.SOCKSHost = socksHost
cfg.SOCKSPort = socksPort
}
return cfg
}
func clientConfigFromSession(cfg session.Config, socksAddr string) client.Config {
return client.Config{
Link: cfg.Link,
Transport: cfg.Transport,
Carrier: cfg.Auth,
RoomURL: cfg.RoomID,
KeyHex: cfg.KeyHex,
LocalAddr: socksAddr,
DNSServer: cfg.DNSServer,
DeviceID: testClientDeviceID,
VideoWidth: cfg.VideoWidth,
VideoHeight: cfg.VideoHeight,
VideoFPS: cfg.VideoFPS,
VideoBitrate: cfg.VideoBitrate,
VideoHW: cfg.VideoHW,
VideoQRSize: cfg.VideoQRSize,
VideoQRRecovery: cfg.VideoQRRecovery,
VideoCodec: cfg.VideoCodec,
VideoTileModule: cfg.VideoTileModule,
VideoTileRS: cfg.VideoTileRS,
VP8FPS: cfg.VP8FPS,
VP8BatchSize: cfg.VP8BatchSize,
SEIFPS: cfg.SEIFPS,
SEIBatchSize: cfg.SEIBatchSize,
SEIFragmentSize: cfg.SEIFragmentSize,
SEIAckTimeoutMS: cfg.SEIAckTimeoutMS,
Engine: cfg.Engine,
URL: cfg.URL,
Token: cfg.Token,
}
}
func failoverE2EConfig(
profiles []supervisor.Profile,
started chan<- string,
side string,
) supervisor.Config {
return supervisor.Config{
Profiles: profiles,
RetryDelay: time.Millisecond,
OnProfileStart: func(profile supervisor.Profile, _ int) {
select {
case started <- side + ":" + profile.Name:
default:
}
},
}
}
func splitHostPort(t *testing.T, addr string) (string, int) {
t.Helper()
host, portText, err := net.SplitHostPort(addr)
if err != nil {
t.Fatalf("split host port %q: %v", addr, err)
}
port, err := strconv.Atoi(portText)
if err != nil {
t.Fatalf("parse port %q: %v", portText, err)
}
return host, port
}
func requireStartedProfiles(t *testing.T, started <-chan string, want []string) {
t.Helper()
seen := make(map[string]bool)
deadline := time.After(3 * time.Second)
for len(seen) < len(want) {
select {
case item := <-started:
seen[item] = true
case <-deadline:
t.Fatalf("started profiles = %v, want all %v", seen, want)
}
}
for _, item := range want {
if !seen[item] {
t.Fatalf("started profiles = %v, missing %s", seen, item)
}
}
}
func waitSupervisorStopped(t *testing.T, name string, ch <-chan error) {
t.Helper()
select {
case err := <-ch:
if err != nil {
t.Fatalf("%s supervisor returned error: %v", name, err)
}
case <-time.After(3 * time.Second):
t.Fatalf("%s supervisor did not stop", name)
}
}
func TestEndedCallbackStopsClientAndServer(t *testing.T) {
rt := startTunnel(t)
rt.room.triggerEnded("conference ended")
rt.waitStopped(t)
}
func eventuallyConnectViaSOCKS(t *testing.T, socksAddr, targetAddr string) net.Conn {
t.Helper()
return eventuallyConnectViaSOCKSWithin(t, socksAddr, targetAddr, 3*time.Second)
}
func eventuallyConnectViaSOCKSWithin(t *testing.T, socksAddr, targetAddr string, timeout time.Duration) net.Conn {
t.Helper()
conn, err := connectViaSOCKSWithin(socksAddr, targetAddr, timeout)
if err != nil {
t.Fatal(err)
}
return conn
}
func connectViaSOCKSWithin(socksAddr, targetAddr string, timeout time.Duration) (net.Conn, error) {
deadline := time.Now().Add(timeout)
var lastErr error
attempt := 0
for time.Now().Before(deadline) {
conn, err := tryConnectViaSOCKS(socksAddr, targetAddr)
if err == nil {
return conn, nil
}
lastErr = err
attempt++
sleep := 250 * time.Millisecond
if attempt > 3 {
sleep = time.Second
}
time.Sleep(sleep)
}
return nil, fmt.Errorf("connect via SOCKS failed after %s: %w", timeout, lastErr)
}
func tryConnectViaSOCKS(socksAddr, targetAddr string) (net.Conn, error) {
dialer := net.Dialer{Timeout: 500 * time.Millisecond}
conn, err := dialer.DialContext(context.Background(), "tcp4", socksAddr)
if err != nil {
return nil, fmt.Errorf("dial socks: %w", err)
}
if _, err := conn.Write([]byte{5, 1, 0}); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("write greeting: %w", err)
}
greeting := make([]byte, 2)
if _, err := io.ReadFull(conn, greeting); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("read greeting: %w", err)
}
if !bytes.Equal(greeting, []byte{5, 0}) {
_ = conn.Close()
return nil, fmt.Errorf("%w: %v", errSocksUnexpectedHello, greeting)
}
host, portText, err := net.SplitHostPort(targetAddr)
if err != nil {
_ = conn.Close()
return nil, fmt.Errorf("split host port: %w", err)
}
port, err := strconv.Atoi(portText)
if err != nil {
_ = conn.Close()
return nil, fmt.Errorf("parse port: %w", err)
}
req := make([]byte, 0, 10)
req = append(req, 5, 1, 0, 1)
req = append(req, net.ParseIP(host).To4()...)
var portBuf [2]byte
binary.BigEndian.PutUint16(portBuf[:], uint16(port)) //nolint:gosec // SOCKS5 port is uint16 by definition
req = append(req, portBuf[:]...)
if _, err := conn.Write(req); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("write connect request: %w", err)
}
reply := make([]byte, 10)
if _, err := io.ReadFull(conn, reply); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("read connect reply: %w", err)
}
if !bytes.Equal(reply, []byte{5, 0, 0, 1, 0, 0, 0, 0, 0, 0}) {
_ = conn.Close()
return nil, fmt.Errorf("%w: %v", errSocksUnexpectedReply, reply)
}
return conn, nil
}
func TestLargeTransferOverTunnel(t *testing.T) {
echoAddr := startEchoServer(t)
rt := startTunnel(t)
defer rt.stop(t)
size := int64(32 << 20)
conn := connectViaSOCKS(t, rt.socksAddr, echoAddr)
defer func() { _ = conn.Close() }()
if err := streamPatternAndVerifyEcho(conn, size); err != nil {
t.Fatalf("large transfer %d bytes failed: %v", size, err)
}
}
func streamPatternAndVerifyEcho(conn net.Conn, size int64) error {
errCh := make(chan error, 1)
go func() {
buf := make([]byte, 32*1024)
var written int64
for written < size {
n := len(buf)
if remaining := size - written; remaining < int64(n) {
n = int(remaining)
}
fillPattern(buf[:n], written)
if _, err := conn.Write(buf[:n]); err != nil {
errCh <- fmt.Errorf("write at %d: %w", written, err)
return
}
written += int64(n)
}
errCh <- nil
}()
buf := make([]byte, 32*1024)
want := make([]byte, len(buf))
var read int64
for read < size {
n := len(buf)
if remaining := size - read; remaining < int64(n) {
n = int(remaining)
}
if err := conn.SetReadDeadline(time.Now().Add(10 * time.Second)); err != nil {
return fmt.Errorf("set read deadline: %w", err)
}
if _, err := io.ReadFull(conn, buf[:n]); err != nil {
return fmt.Errorf("read at %d: %w", read, err)
}
fillPattern(want[:n], read)
if !bytes.Equal(buf[:n], want[:n]) {
return fmt.Errorf("%w %d", errPayloadMismatchOffset, read)
}
read += int64(n)
}
if err := <-errCh; err != nil {
return err
}
return nil
}
func fillPattern(buf []byte, offset int64) {
for i := range buf {
buf[i] = byte((offset + int64(i)*31 + 7) & 0xff)
}
}