Files
olcrtc/internal/e2e/tunnel_test.go
2026-05-15 15:37:58 +03:00

1294 lines
33 KiB
Go

package e2e
import (
"bufio"
"bytes"
"context"
"encoding/binary"
"errors"
"flag"
"fmt"
"io"
"net"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/openlibrecommunity/olcrtc/internal/app/session"
"github.com/openlibrecommunity/olcrtc/internal/auth"
authSaluteJazz "github.com/openlibrecommunity/olcrtc/internal/auth/salutejazz"
authWBStream "github.com/openlibrecommunity/olcrtc/internal/auth/wbstream"
"github.com/openlibrecommunity/olcrtc/internal/carrier"
"github.com/openlibrecommunity/olcrtc/internal/client"
"github.com/openlibrecommunity/olcrtc/internal/link"
"github.com/openlibrecommunity/olcrtc/internal/server"
"github.com/pion/webrtc/v4"
)
const (
testKeyHex = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff"
transportData = "datachannel"
transportVideo = "videochannel"
transportSEI = "seichannel"
transportVP8 = "vp8channel"
linkDirect = "direct"
testRoom = "room"
localDNSServer = "127.0.0.1:53"
videoHWNone = "none"
testClientDeviceID = "client-1"
)
var (
errRealE2ENotReady = errors.New("real e2e client did not become ready")
errTunnelDidNotStop = errors.New("tunnel goroutine did not stop")
errRealE2EEchoMismatch = errors.New("real e2e echo payload mismatch")
errSocksUnexpectedReply = errors.New("unexpected SOCKS5 reply")
errSocksUnexpectedHello = errors.New("unexpected SOCKS5 greeting")
errPayloadMismatchOffset = errors.New("payload mismatch at offset")
)
var (
realE2E = flag.Bool( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-e2e",
false,
"run real provider e2e matrix against external WebRTC services",
)
realE2ECarriers = flag.String( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-carriers",
"telemost,wbstream",
"comma-separated carriers for real e2e",
)
realE2ETransports = flag.String( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-transports",
"datachannel,videochannel,seichannel,vp8channel",
"comma-separated transports for real e2e",
)
realE2EJazzRoom = flag.String( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-jazz-room",
"",
"SaluteJazz room for real e2e, format roomID:password; autogenerated when empty",
)
realE2ETelemostRoom = flag.String( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-telemost-room",
"41514917109506",
"Telemost room URL or id for real e2e",
)
realE2EWBStreamRoom = flag.String( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-wbstream-room",
"019e23c2-a580-7550-b08a-7ac5342ca21f",
"WB Stream room id for real e2e; autogenerated when empty",
)
realE2EJitsiRoom = flag.String( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-jitsi-room",
"https://meet.cryptopro.ru/deadbeef",
"Jitsi Meet room URL for real e2e (format https://host/room or host/room)",
)
realE2ETimeout = flag.Duration( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-timeout",
90*time.Second,
"timeout per real e2e provider/transport case",
)
)
type realE2EExpectation int
const (
realE2EExpectFail realE2EExpectation = iota
realE2EExpectPass
realE2EBestEffort
)
type memorySession struct {
stream *memoryStream
}
func (s *memorySession) Capabilities() carrier.Capabilities {
return carrier.Capabilities{ByteStream: true, VideoTrack: true}
}
func (s *memorySession) OpenByteStream() (carrier.ByteStream, error) {
return s.stream, nil
}
func (s *memorySession) OpenVideoTrack() (carrier.VideoTrack, error) {
return s.stream, nil
}
type memoryRoom struct {
mu sync.Mutex
streams map[*memoryStream]struct{}
}
func (r *memoryRoom) connectedCount() int {
r.mu.Lock()
defer r.mu.Unlock()
count := 0
for stream := range r.streams {
if stream.isConnected() {
count++
}
}
return count
}
func (r *memoryRoom) waitConnected(t *testing.T, want int) {
t.Helper()
deadline := time.Now().Add(3 * time.Second)
for time.Now().Before(deadline) {
if r.connectedCount() >= want {
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("memory room connected streams = %d, want at least %d", r.connectedCount(), want)
}
func (r *memoryRoom) triggerReconnect() {
r.mu.Lock()
streams := make([]*memoryStream, 0, len(r.streams))
for stream := range r.streams {
streams = append(streams, stream)
}
r.mu.Unlock()
var wg sync.WaitGroup
for _, stream := range streams {
wg.Add(1)
go func() {
defer wg.Done()
stream.triggerReconnect()
}()
}
wg.Wait()
}
func (r *memoryRoom) triggerEnded(reason string) {
r.mu.Lock()
streams := make([]*memoryStream, 0, len(r.streams))
for stream := range r.streams {
streams = append(streams, stream)
}
r.mu.Unlock()
for _, stream := range streams {
stream.triggerEnded(reason)
}
}
type memoryStream struct {
room *memoryRoom
onData func([]byte)
mu sync.Mutex
connected bool
closed bool
reconnect func()
ended func(string)
track webrtc.TrackLocal
trackCB func(*webrtc.TrackRemote, *webrtc.RTPReceiver)
pending [][]byte
}
func (s *memoryStream) Connect(context.Context) error {
s.mu.Lock()
s.connected = true
pending := s.pending
s.pending = nil
onData := s.onData
s.mu.Unlock()
for _, payload := range pending {
if onData != nil {
onData(payload)
}
}
return nil
}
func (s *memoryStream) Send(data []byte) error {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
return io.ErrClosedPipe
}
s.mu.Unlock()
payload := append([]byte(nil), data...)
s.room.mu.Lock()
peers := make([]*memoryStream, 0, len(s.room.streams))
for peer := range s.room.streams {
if peer != s {
peers = append(peers, peer)
}
}
s.room.mu.Unlock()
for _, peer := range peers {
peer.deliver(payload)
}
return nil
}
func (s *memoryStream) deliver(data []byte) {
s.mu.Lock()
if !s.connected && !s.closed {
s.pending = append(s.pending, append([]byte(nil), data...))
s.mu.Unlock()
return
}
ready := !s.closed && s.onData != nil
onData := s.onData
s.mu.Unlock()
if ready {
onData(append([]byte(nil), data...))
}
}
func (s *memoryStream) Close() error {
s.mu.Lock()
s.closed = true
s.connected = false
s.mu.Unlock()
return nil
}
func (s *memoryStream) SetReconnectCallback(cb func()) {
s.mu.Lock()
s.reconnect = cb
s.mu.Unlock()
}
func (s *memoryStream) SetShouldReconnect(func() bool) {}
func (s *memoryStream) SetEndedCallback(cb func(string)) {
s.mu.Lock()
s.ended = cb
s.mu.Unlock()
}
func (s *memoryStream) WatchConnection(ctx context.Context) {
<-ctx.Done()
}
func (s *memoryStream) CanSend() bool {
return s.isConnected()
}
func (s *memoryStream) AddTrack(track webrtc.TrackLocal) error {
s.mu.Lock()
s.track = track
s.mu.Unlock()
return nil
}
func (s *memoryStream) SetTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) {
s.mu.Lock()
s.trackCB = cb
s.mu.Unlock()
}
func (s *memoryStream) isConnected() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.connected && !s.closed
}
func (s *memoryStream) triggerReconnect() {
s.mu.Lock()
reconnect := s.reconnect
ready := s.connected && !s.closed && reconnect != nil
s.mu.Unlock()
if ready {
reconnect()
}
}
func (s *memoryStream) triggerEnded(reason string) {
s.mu.Lock()
ended := s.ended
ready := s.connected && !s.closed && ended != nil
s.mu.Unlock()
if ready {
ended(reason)
}
}
func registerMemoryCarrier(t *testing.T) (string, *memoryRoom) {
t.Helper()
session.RegisterDefaults()
name := "e2e-memory-" + t.Name()
room := &memoryRoom{streams: make(map[*memoryStream]struct{})}
carrier.Register(name, func(_ context.Context, cfg carrier.Config) (carrier.Session, error) {
stream := &memoryStream{room: room, onData: cfg.OnData}
room.mu.Lock()
room.streams[stream] = struct{}{}
room.mu.Unlock()
return &memorySession{stream: stream}, nil
})
return name, room
}
func registerMemoryCarrierAs(t *testing.T, name string) {
t.Helper()
room := &memoryRoom{streams: make(map[*memoryStream]struct{})}
carrier.Register(name, func(_ context.Context, cfg carrier.Config) (carrier.Session, error) {
stream := &memoryStream{room: room, onData: cfg.OnData}
room.mu.Lock()
room.streams[stream] = struct{}{}
room.mu.Unlock()
return &memorySession{stream: stream}, nil
})
}
func builtInCarrierNames() []string {
return []string{"jazz", "telemost", "wbstream", "jitsi"} //nolint:goconst // test literal, repetition is intentional
}
func builtInTransportNames() []string {
return []string{transportData, transportVideo, transportSEI, transportVP8}
}
func realE2ECaseExpectation(carrierName, transportName string) realE2EExpectation {
switch carrierName {
case "telemost":
switch transportName {
case transportVP8:
return realE2EExpectPass
case transportVideo:
return realE2EBestEffort
default:
return realE2EExpectFail
}
case "wbstream":
if transportName == transportData {
return realE2EExpectFail
}
return realE2EExpectPass
case "jazz":
if transportName == transportData {
return realE2EExpectPass
}
return realE2EExpectFail
case "jitsi":
// Jitsi colibri-ws bridge channel maps cleanly onto the
// datachannel transport (raw bytes broadcast through
// EndpointMessage). Video transports go through pion's
// PeerConnection negotiated via Jingle session-accept; results
// are bridge/instance dependent (some operators throttle or
// strip non-camera video), hence best-effort.
switch transportName {
case transportData:
return realE2EExpectPass
case transportVP8, transportVideo, transportSEI:
return realE2EBestEffort
default:
return realE2EBestEffort
}
default:
return realE2EExpectPass
}
}
func realE2EExpectationLabel(expectation realE2EExpectation) string {
switch expectation {
case realE2EExpectPass:
return "SUCCESS"
case realE2EBestEffort:
return "BEST EFFORT"
case realE2EExpectFail:
return "EXPECTED FAIL"
default:
return "UNKNOWN"
}
}
func TestRealE2ECaseExpectation(t *testing.T) {
tests := []struct {
name string
carrier string
transport string
want realE2EExpectation
}{
{
name: "jazz datachannel is expected to pass",
carrier: "jazz",
transport: transportData,
want: realE2EExpectPass,
},
{
name: "jazz videochannel is expected to fail",
carrier: "jazz",
transport: transportVideo,
want: realE2EExpectFail,
},
{
name: "jazz seichannel is expected to fail",
carrier: "jazz",
transport: transportSEI,
want: realE2EExpectFail,
},
{
name: "jazz vp8channel is expected to fail",
carrier: "jazz",
transport: transportVP8,
want: realE2EExpectFail,
},
{
name: "telemost datachannel is expected to fail",
carrier: "telemost",
transport: transportData,
want: realE2EExpectFail,
},
{
name: "telemost vp8channel is expected to pass",
carrier: "telemost",
transport: transportVP8,
want: realE2EExpectPass,
},
{
name: "wbstream datachannel is expected to fail",
carrier: "wbstream",
transport: transportData,
want: realE2EExpectFail,
},
{
name: "jitsi datachannel is expected to pass",
carrier: "jitsi",
transport: transportData,
want: realE2EExpectPass,
},
{
name: "jitsi vp8channel is best effort",
carrier: "jitsi",
transport: transportVP8,
want: realE2EBestEffort,
},
{
name: "jitsi videochannel is best effort",
carrier: "jitsi",
transport: transportVideo,
want: realE2EBestEffort,
},
{
name: "jitsi seichannel is best effort",
carrier: "jitsi",
transport: transportSEI,
want: realE2EBestEffort,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := realE2ECaseExpectation(tt.carrier, tt.transport); got != tt.want {
t.Fatalf("realE2ECaseExpectation(%q, %q) = %v, want %v", tt.carrier, tt.transport, got, tt.want)
}
})
}
}
func splitTestList(value string) []string {
parts := strings.Split(value, ",")
items := make([]string, 0, len(parts))
for _, part := range parts {
part = strings.TrimSpace(part)
if part != "" {
items = append(items, part)
}
}
return items
}
//nolint:cyclop // table-driven test naturally has many branches
func realRoomURL(ctx context.Context, t *testing.T, carrierName string) string {
t.Helper()
switch carrierName {
case "jazz":
if *realE2EJazzRoom != "" {
return *realE2EJazzRoom
}
room, err := authSaluteJazz.Provider{}.CreateRoom(ctx, auth.Config{Name: "olcrtc-e2e-room"})
if err != nil {
t.Skipf("skip jazz real e2e: create room failed: %v", err)
}
return room
case "telemost":
room := *realE2ETelemostRoom
if room != "" && !strings.HasPrefix(room, "http://") && !strings.HasPrefix(room, "https://") {
room = "https://telemost.yandex.ru/j/" + room
}
return room
case "wbstream":
if *realE2EWBStreamRoom != "" {
return *realE2EWBStreamRoom
}
room, err := authWBStream.Provider{}.CreateRoom(ctx, auth.Config{Name: "olcrtc-e2e-room"})
if err != nil {
t.Skipf("skip wbstream real e2e: create room failed: %v", err)
}
return room
case "jitsi":
// Jitsi has no notion of "creating" a room — names are conjured
// on first join. The default flag points at meet.cryptopro.ru
// (a CryptoPro-operated public Jitsi instance) with a fixed
// room slug so the server and client land in the same MUC.
_ = ctx
room := *realE2EJitsiRoom
if room == "" {
t.Skip("skip jitsi real e2e: empty -olcrtc.real-jitsi-room")
}
return room
default:
return ""
}
}
func requireRealRoom(ctx context.Context, t *testing.T, carrierName string) string {
t.Helper()
roomURL := realRoomURL(ctx, t, carrierName)
if roomURL == "" {
t.Fatalf("missing room for %s", carrierName)
}
return roomURL
}
func validSessionConfig(mode, carrierName, transportName string) session.Config {
return session.Config{
Mode: mode,
Link: linkDirect,
Transport: transportName,
Auth: carrierName,
RoomID: testRoom,
KeyHex: testKeyHex,
SOCKSHost: "127.0.0.1",
SOCKSPort: 1080,
DNSServer: localDNSServer,
VideoWidth: 1080,
VideoHeight: 1080,
VideoFPS: 30,
VideoBitrate: "1M",
VideoHW: videoHWNone,
VideoCodec: "tile",
VideoTileModule: 4,
VideoTileRS: 20,
VP8FPS: 60,
VP8BatchSize: 8,
SEIFPS: 30,
SEIBatchSize: 4,
SEIFragmentSize: 512,
SEIAckTimeoutMS: 1500,
}
}
func validLinkConfig(carrierName, transportName string) link.Config {
cfg := validSessionConfig("cnc", carrierName, transportName)
return link.Config{
Transport: cfg.Transport,
Carrier: cfg.Auth,
RoomURL: testRoom,
DeviceID: "e2e-link-test",
Name: "e2e-" + carrierName + "-" + transportName,
DNSServer: cfg.DNSServer,
VideoWidth: cfg.VideoWidth,
VideoHeight: cfg.VideoHeight,
VideoFPS: cfg.VideoFPS,
VideoBitrate: cfg.VideoBitrate,
VideoHW: cfg.VideoHW,
VideoCodec: cfg.VideoCodec,
VideoTileModule: cfg.VideoTileModule,
VideoTileRS: cfg.VideoTileRS,
VP8FPS: cfg.VP8FPS,
VP8BatchSize: cfg.VP8BatchSize,
SEIFPS: cfg.SEIFPS,
SEIBatchSize: cfg.SEIBatchSize,
SEIFragmentSize: cfg.SEIFragmentSize,
SEIAckTimeoutMS: cfg.SEIAckTimeoutMS,
}
}
func startEchoServer(t *testing.T) string {
t.Helper()
var lc net.ListenConfig
ln, err := lc.Listen(context.Background(), "tcp4", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen echo: %v", err)
}
t.Cleanup(func() { _ = ln.Close() })
go func() {
for {
conn, err := ln.Accept()
if err != nil {
return
}
go func() {
defer func() { _ = conn.Close() }()
_, _ = io.Copy(conn, conn)
}()
}
}()
return ln.Addr().String()
}
func freeLocalAddr(ctx context.Context, t *testing.T) string {
t.Helper()
var lc net.ListenConfig
ln, err := lc.Listen(ctx, "tcp4", "127.0.0.1:0")
if err != nil {
t.Fatalf("reserve local addr: %v", err)
}
addr := ln.Addr().String()
if err := ln.Close(); err != nil {
t.Fatalf("close reserved addr: %v", err)
}
return addr
}
func waitForReady(t *testing.T, ready <-chan struct{}) {
t.Helper()
select {
case <-ready:
case <-time.After(3 * time.Second):
t.Fatal("client did not become ready")
}
}
type tunnelRuntime struct {
socksAddr string
room *memoryRoom
cancel context.CancelFunc
serverErr chan error
clientErr chan error
stopWait time.Duration
}
func startTunnel(t *testing.T) *tunnelRuntime {
t.Helper()
carrierName, room := registerMemoryCarrier(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
socksAddr := freeLocalAddr(ctx, t)
serverErr := make(chan error, 1)
go func() {
serverErr <- server.Run(ctx, server.Config{
Link: linkDirect,
Transport: transportData,
Carrier: carrierName,
RoomURL: testRoom,
KeyHex: testKeyHex,
DNSServer: localDNSServer,
})
}()
room.waitConnected(t, 1)
ready := make(chan struct{})
clientErr := make(chan error, 1)
go func() {
clientErr <- client.RunWithReady(ctx, client.Config{
Link: linkDirect,
Transport: transportData,
Carrier: carrierName,
RoomURL: testRoom,
KeyHex: testKeyHex,
DeviceID: testClientDeviceID,
LocalAddr: socksAddr,
DNSServer: localDNSServer,
}, func() { close(ready) })
}()
waitForReady(t, ready)
return &tunnelRuntime{
socksAddr: socksAddr,
room: room,
cancel: cancel,
serverErr: serverErr,
clientErr: clientErr,
stopWait: 3 * time.Second,
}
}
func startRealTunnel(
ctx context.Context,
t *testing.T,
carrierName, transportName, roomURL, _, clientDeviceID string,
) (*tunnelRuntime, error) {
t.Helper()
session.RegisterDefaults()
socksAddr := freeLocalAddr(ctx, t)
runCtx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)
serverErr := make(chan error, 1)
go func() {
serverErr <- server.Run(runCtx, server.Config{
Link: linkDirect,
Transport: transportName,
Carrier: carrierName,
RoomURL: roomURL,
KeyHex: testKeyHex,
DNSServer: localDNSServer,
VideoWidth: 1080,
VideoHeight: 1080,
VideoFPS: 60,
VideoBitrate: "5000k",
VideoHW: videoHWNone,
VideoQRSize: 512,
VideoQRRecovery: "low",
VideoCodec: "qrcode",
VideoTileModule: 4,
VideoTileRS: 20,
VP8FPS: 60,
VP8BatchSize: 8,
SEIFPS: 30,
SEIBatchSize: 4,
SEIFragmentSize: 512,
SEIAckTimeoutMS: 1500,
})
}()
select {
case err := <-serverErr:
cancel()
return nil, fmt.Errorf("server exited before client start: %w", err)
case <-time.After(2 * time.Second):
case <-runCtx.Done():
cancel()
return nil, fmt.Errorf("server context ended before client start: %w", runCtx.Err())
}
ready := make(chan struct{})
clientErr := make(chan error, 1)
go func() {
clientErr <- client.RunWithReady(runCtx, client.Config{
Link: linkDirect,
Transport: transportName,
Carrier: carrierName,
RoomURL: roomURL,
KeyHex: testKeyHex,
DeviceID: clientDeviceID,
LocalAddr: socksAddr,
DNSServer: localDNSServer,
VideoWidth: 1080,
VideoHeight: 1080,
VideoFPS: 60,
VideoBitrate: "5000k",
VideoHW: videoHWNone,
VideoQRSize: 512,
VideoQRRecovery: "low",
VideoCodec: "qrcode",
VideoTileModule: 4,
VideoTileRS: 20,
VP8FPS: 60,
VP8BatchSize: 8,
SEIFPS: 30,
SEIBatchSize: 4,
SEIFragmentSize: 512,
SEIAckTimeoutMS: 1500,
}, func() { close(ready) })
}()
select {
case <-ready:
case err := <-clientErr:
cancel()
return nil, fmt.Errorf("client exited before ready: %w", err)
case err := <-serverErr:
cancel()
return nil, fmt.Errorf("server exited before client ready: %w", err)
case <-time.After(*realE2ETimeout):
cancel()
return nil, errRealE2ENotReady
case <-runCtx.Done():
cancel()
return nil, fmt.Errorf("real e2e context ended before ready: %w", runCtx.Err())
}
return &tunnelRuntime{
socksAddr: socksAddr,
cancel: cancel,
serverErr: serverErr,
clientErr: clientErr,
stopWait: 20 * time.Second,
}, nil
}
func (r *tunnelRuntime) stop(t *testing.T) {
t.Helper()
if err := r.stopErr(); err != nil {
t.Fatal(err)
}
}
func (r *tunnelRuntime) waitStopped(t *testing.T) {
t.Helper()
if err := r.waitStoppedErr(); err != nil {
t.Fatal(err)
}
}
func (r *tunnelRuntime) stopErr() error {
r.cancel()
return r.waitStoppedErr()
}
func (r *tunnelRuntime) waitStoppedErr() error {
stopWait := r.stopWait
if stopWait <= 0 {
stopWait = 3 * time.Second
}
for _, item := range []struct {
name string
ch <-chan error
}{
{name: "client", ch: r.clientErr},
{name: "server", ch: r.serverErr},
} {
select {
case err := <-item.ch:
if err != nil {
return fmt.Errorf("%s returned error: %w", item.name, err)
}
case <-time.After(stopWait):
return fmt.Errorf("%w: %s", errTunnelDidNotStop, item.name)
}
}
return nil
}
func connectViaSOCKS(t *testing.T, socksAddr, targetAddr string) net.Conn {
t.Helper()
dialer := net.Dialer{Timeout: 2 * time.Second}
conn, err := dialer.DialContext(context.Background(), "tcp4", socksAddr)
if err != nil {
t.Fatalf("dial socks: %v", err)
}
if _, err := conn.Write([]byte{5, 1, 0}); err != nil {
_ = conn.Close()
t.Fatalf("write socks greeting: %v", err)
}
greeting := make([]byte, 2)
if _, err := io.ReadFull(conn, greeting); err != nil {
_ = conn.Close()
t.Fatalf("read socks greeting: %v", err)
}
if !bytes.Equal(greeting, []byte{5, 0}) {
_ = conn.Close()
t.Fatalf("socks greeting = %v, want [5 0]", greeting)
}
host, portText, err := net.SplitHostPort(targetAddr)
if err != nil {
_ = conn.Close()
t.Fatalf("split target addr: %v", err)
}
port, err := strconv.Atoi(portText)
if err != nil {
_ = conn.Close()
t.Fatalf("parse target port: %v", err)
}
req := make([]byte, 0, 10)
req = append(req, 5, 1, 0, 1)
req = append(req, net.ParseIP(host).To4()...)
var portBuf [2]byte
binary.BigEndian.PutUint16(portBuf[:], uint16(port)) //nolint:gosec // SOCKS5 port is uint16 by definition
req = append(req, portBuf[:]...)
if _, err := conn.Write(req); err != nil {
_ = conn.Close()
t.Fatalf("write socks connect: %v", err)
}
reply := make([]byte, 10)
if _, err := io.ReadFull(conn, reply); err != nil {
_ = conn.Close()
t.Fatalf("read socks connect reply: %v", err)
}
if !bytes.Equal(reply, []byte{5, 0, 0, 1, 0, 0, 0, 0, 0, 0}) {
_ = conn.Close()
t.Fatalf("socks reply = %v, want success", reply)
}
return conn
}
func TestBuiltInProviderTransportMatrixValidates(t *testing.T) {
session.RegisterDefaults()
for _, mode := range []string{"srv", "cnc"} {
t.Run(mode, func(t *testing.T) {
for _, carrierName := range builtInCarrierNames() {
t.Run(carrierName, func(t *testing.T) {
for _, transportName := range builtInTransportNames() {
t.Run(transportName, func(t *testing.T) {
cfg := validSessionConfig(mode, carrierName, transportName)
if err := session.Validate(cfg); err != nil {
t.Fatalf("Validate() error = %v", err)
}
})
}
})
}
})
}
}
func TestDirectLinkCreatesAllProviderTransportCombinations(t *testing.T) {
session.RegisterDefaults()
for _, carrierName := range builtInCarrierNames() {
registerMemoryCarrierAs(t, carrierName)
}
for _, carrierName := range builtInCarrierNames() {
t.Run(carrierName, func(t *testing.T) {
for _, transportName := range builtInTransportNames() {
t.Run(transportName, func(t *testing.T) {
ln, err := link.New(context.Background(), linkDirect, validLinkConfig(carrierName, transportName))
if err != nil {
t.Fatalf("link.New() error = %v", err)
}
if err := ln.Close(); err != nil {
t.Fatalf("Close() error = %v", err)
}
})
}
})
}
}
func TestDirectLinkConnectsFastProviderTransportMatrix(t *testing.T) {
session.RegisterDefaults()
for _, carrierName := range builtInCarrierNames() {
registerMemoryCarrierAs(t, carrierName)
}
for _, carrierName := range builtInCarrierNames() {
t.Run(carrierName, func(t *testing.T) {
for _, transportName := range []string{transportData, transportSEI} {
t.Run(transportName, func(t *testing.T) {
ln, err := link.New(context.Background(), linkDirect, validLinkConfig(carrierName, transportName))
if err != nil {
t.Fatalf("link.New() error = %v", err)
}
if err := ln.Connect(context.Background()); err != nil {
t.Fatalf("Connect() error = %v", err)
}
if !ln.CanSend() {
t.Fatal("CanSend() = false, want true")
}
if err := ln.Close(); err != nil {
t.Fatalf("Close() error = %v", err)
}
})
}
})
}
}
//nolint:cyclop // table-driven test naturally has many branches
func TestRealProviderTransportMatrix(t *testing.T) {
if !*realE2E {
t.Skip("real provider e2e disabled; pass -olcrtc.real-e2e with provider room flags")
}
carriers := splitTestList(*realE2ECarriers)
transports := splitTestList(*realE2ETransports)
if len(carriers) == 0 {
t.Fatal("no real e2e carriers selected")
}
if len(transports) == 0 {
t.Fatal("no real e2e transports selected")
}
echoAddr := startEchoServer(t)
for _, carrierName := range carriers {
t.Run(carrierName, func(t *testing.T) {
roomCtx, cancelRoom := context.WithTimeout(context.Background(), *realE2ETimeout)
defer cancelRoom()
roomURL := requireRealRoom(roomCtx, t, carrierName)
for _, transportName := range transports {
t.Run(transportName, func(t *testing.T) {
expectation := realE2ECaseExpectation(carrierName, transportName)
label := realE2EExpectationLabel(expectation)
err := runRealE2ECase(t, carrierName, transportName, roomURL, echoAddr)
switch {
case err == nil && expectation == realE2EExpectPass:
t.Logf("%s %s/%s", label, carrierName, transportName)
case err == nil && expectation == realE2EExpectFail:
t.Fatalf("UNEXPECTED SUCCESS %s/%s", carrierName, transportName)
case err != nil && expectation == realE2EExpectPass:
t.Fatalf("EXPECTED SUCCESS %s/%s failed: %v", carrierName, transportName, err)
case err != nil && expectation == realE2EExpectFail:
t.Logf("%s %s/%s: %v", label, carrierName, transportName, err)
case err == nil && expectation == realE2EBestEffort:
t.Logf("%s %s/%s succeeded", label, carrierName, transportName)
case err != nil && expectation == realE2EBestEffort:
t.Logf("%s %s/%s failed: %v", label, carrierName, transportName, err)
}
})
}
})
}
}
func runRealE2ECase(t *testing.T, carrierName, transportName, roomURL, echoAddr string) (err error) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), *realE2ETimeout)
defer cancel()
rt, err := startRealTunnel(ctx, t, carrierName, transportName, roomURL, testClientDeviceID, testClientDeviceID)
if err != nil {
return err
}
defer func() {
if stopErr := rt.stopErr(); err == nil && stopErr != nil {
err = stopErr
}
}()
conn, err := connectViaSOCKSWithin(rt.socksAddr, echoAddr, *realE2ETimeout)
if err != nil {
return err
}
defer func() { _ = conn.Close() }()
payload := []byte("olcrtc-real-e2e-" + carrierName + "-" + transportName + "\n")
if _, err := conn.Write(payload); err != nil {
return fmt.Errorf("write real e2e payload: %w", err)
}
if err := conn.SetReadDeadline(time.Now().Add(*realE2ETimeout)); err != nil {
return fmt.Errorf("set real e2e read deadline: %w", err)
}
line, err := bufio.NewReader(conn).ReadBytes('\n')
if err != nil {
return fmt.Errorf("read real e2e echo: %w", err)
}
if !bytes.Equal(line, payload) {
return fmt.Errorf("%w: got %q, want %q", errRealE2EEchoMismatch, line, payload)
}
return nil
}
func TestClientServerSOCKSTunnelOverMemoryDatachannel(t *testing.T) {
echoAddr := startEchoServer(t)
rt := startTunnel(t)
defer rt.stop(t)
conn := connectViaSOCKS(t, rt.socksAddr, echoAddr)
defer func() { _ = conn.Close() }()
payload := []byte("olcrtc-e2e-payload\n")
if _, err := conn.Write(payload); err != nil {
t.Fatalf("write tunneled payload: %v", err)
}
if err := conn.SetReadDeadline(time.Now().Add(3 * time.Second)); err != nil {
t.Fatalf("set read deadline: %v", err)
}
line, err := bufio.NewReader(conn).ReadBytes('\n')
if err != nil {
t.Fatalf("read tunneled echo: %v", err)
}
if !bytes.Equal(line, payload) {
t.Fatalf("echo = %q, want %q", line, payload)
}
}
func TestFrequentReconnectsStillAllowNewSOCKSConnections(t *testing.T) {
echoAddr := startEchoServer(t)
rt := startTunnel(t)
defer rt.stop(t)
for i := range 5 {
rt.room.triggerReconnect()
conn := eventuallyConnectViaSOCKS(t, rt.socksAddr, echoAddr)
payload := fmt.Appendf(nil, "after-reconnect-%d\n", i)
if _, err := conn.Write(payload); err != nil {
_ = conn.Close()
t.Fatalf("write after reconnect %d: %v", i, err)
}
if err := conn.SetReadDeadline(time.Now().Add(3 * time.Second)); err != nil {
_ = conn.Close()
t.Fatalf("set deadline after reconnect %d: %v", i, err)
}
line, err := bufio.NewReader(conn).ReadBytes('\n')
_ = conn.Close()
if err != nil {
t.Fatalf("read after reconnect %d: %v", i, err)
}
if !bytes.Equal(line, payload) {
t.Fatalf("echo after reconnect %d = %q, want %q", i, line, payload)
}
}
}
func TestEndedCallbackStopsClientAndServer(t *testing.T) {
rt := startTunnel(t)
rt.room.triggerEnded("conference ended")
rt.waitStopped(t)
}
func eventuallyConnectViaSOCKS(t *testing.T, socksAddr, targetAddr string) net.Conn {
t.Helper()
return eventuallyConnectViaSOCKSWithin(t, socksAddr, targetAddr, 3*time.Second)
}
func eventuallyConnectViaSOCKSWithin(t *testing.T, socksAddr, targetAddr string, timeout time.Duration) net.Conn {
t.Helper()
conn, err := connectViaSOCKSWithin(socksAddr, targetAddr, timeout)
if err != nil {
t.Fatal(err)
}
return conn
}
func connectViaSOCKSWithin(socksAddr, targetAddr string, timeout time.Duration) (net.Conn, error) {
deadline := time.Now().Add(timeout)
var lastErr error
attempt := 0
for time.Now().Before(deadline) {
conn, err := tryConnectViaSOCKS(socksAddr, targetAddr)
if err == nil {
return conn, nil
}
lastErr = err
attempt++
sleep := 250 * time.Millisecond
if attempt > 3 {
sleep = time.Second
}
time.Sleep(sleep)
}
return nil, fmt.Errorf("connect via SOCKS failed after %s: %w", timeout, lastErr)
}
func tryConnectViaSOCKS(socksAddr, targetAddr string) (net.Conn, error) {
dialer := net.Dialer{Timeout: 500 * time.Millisecond}
conn, err := dialer.DialContext(context.Background(), "tcp4", socksAddr)
if err != nil {
return nil, fmt.Errorf("dial socks: %w", err)
}
if _, err := conn.Write([]byte{5, 1, 0}); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("write greeting: %w", err)
}
greeting := make([]byte, 2)
if _, err := io.ReadFull(conn, greeting); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("read greeting: %w", err)
}
if !bytes.Equal(greeting, []byte{5, 0}) {
_ = conn.Close()
return nil, fmt.Errorf("%w: %v", errSocksUnexpectedHello, greeting)
}
host, portText, err := net.SplitHostPort(targetAddr)
if err != nil {
_ = conn.Close()
return nil, fmt.Errorf("split host port: %w", err)
}
port, err := strconv.Atoi(portText)
if err != nil {
_ = conn.Close()
return nil, fmt.Errorf("parse port: %w", err)
}
req := make([]byte, 0, 10)
req = append(req, 5, 1, 0, 1)
req = append(req, net.ParseIP(host).To4()...)
var portBuf [2]byte
binary.BigEndian.PutUint16(portBuf[:], uint16(port)) //nolint:gosec // SOCKS5 port is uint16 by definition
req = append(req, portBuf[:]...)
if _, err := conn.Write(req); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("write connect request: %w", err)
}
reply := make([]byte, 10)
if _, err := io.ReadFull(conn, reply); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("read connect reply: %w", err)
}
if !bytes.Equal(reply, []byte{5, 0, 0, 1, 0, 0, 0, 0, 0, 0}) {
_ = conn.Close()
return nil, fmt.Errorf("%w: %v", errSocksUnexpectedReply, reply)
}
return conn, nil
}
func TestLargeTransferOverTunnel(t *testing.T) {
echoAddr := startEchoServer(t)
rt := startTunnel(t)
defer rt.stop(t)
size := int64(32 << 20)
conn := connectViaSOCKS(t, rt.socksAddr, echoAddr)
defer func() { _ = conn.Close() }()
if err := streamPatternAndVerifyEcho(conn, size); err != nil {
t.Fatalf("large transfer %d bytes failed: %v", size, err)
}
}
func streamPatternAndVerifyEcho(conn net.Conn, size int64) error {
errCh := make(chan error, 1)
go func() {
buf := make([]byte, 32*1024)
var written int64
for written < size {
n := len(buf)
if remaining := size - written; remaining < int64(n) {
n = int(remaining)
}
fillPattern(buf[:n], written)
if _, err := conn.Write(buf[:n]); err != nil {
errCh <- fmt.Errorf("write at %d: %w", written, err)
return
}
written += int64(n)
}
errCh <- nil
}()
buf := make([]byte, 32*1024)
want := make([]byte, len(buf))
var read int64
for read < size {
n := len(buf)
if remaining := size - read; remaining < int64(n) {
n = int(remaining)
}
if err := conn.SetReadDeadline(time.Now().Add(10 * time.Second)); err != nil {
return fmt.Errorf("set read deadline: %w", err)
}
if _, err := io.ReadFull(conn, buf[:n]); err != nil {
return fmt.Errorf("read at %d: %w", read, err)
}
fillPattern(want[:n], read)
if !bytes.Equal(buf[:n], want[:n]) {
return fmt.Errorf("%w %d", errPayloadMismatchOffset, read)
}
read += int64(n)
}
if err := <-errCh; err != nil {
return err
}
return nil
}
func fillPattern(buf []byte, offset int64) {
for i := range buf {
buf[i] = byte((offset + int64(i)*31 + 7) & 0xff)
}
}