Files
olcrtc/internal/e2e/tunnel_test.go
zarazaex69 e7657b2619 refactor: remove link layer
internal/link and internal/link/direct were a single-implementation
abstraction layer where directLink mechanically proxied every method to
transport.Transport — only Features() lived above transport.Transport,
and even that was a Features() alias. Six layers of plumbing for zero
behavioural value.

Drop the layer entirely:
- muxconn.Conn now takes a transport.Transport directly.
- server.Server and client.Client store transport.Transport, call
  transport.New, and expose Features() through transport.Transport's
  built-in method.
- server.Config and client.Config lose their Link string field.
- session.Config loses Link + validateLink + ErrLinkRequired/ErrUnsupportedLink.
- config.File and config.Profile lose the link YAML key.
- pkg/olcrtc/tunnel.Config loses Link.
- mobile drops defaultLink, SetLink, and mobileConfig.link.

Two e2e tests that exercised link.New directly are renamed to call
transport.New (TestTransportCreatesAllProviderTransportCombinations and
TestTransportConnectsFastProviderTransportMatrix); behaviour is unchanged.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
2026-05-16 13:51:02 +03:00

1533 lines
42 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package e2e
import (
"bufio"
"bytes"
"context"
"crypto/rand"
"encoding/binary"
"encoding/hex"
"errors"
"flag"
"fmt"
"io"
"net"
"os"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/openlibrecommunity/olcrtc/internal/app/session"
"github.com/openlibrecommunity/olcrtc/internal/auth"
authSaluteJazz "github.com/openlibrecommunity/olcrtc/internal/auth/salutejazz"
authWBStream "github.com/openlibrecommunity/olcrtc/internal/auth/wbstream"
"github.com/openlibrecommunity/olcrtc/internal/carrier"
"github.com/openlibrecommunity/olcrtc/internal/client"
"github.com/openlibrecommunity/olcrtc/internal/server"
"github.com/openlibrecommunity/olcrtc/internal/supervisor"
"github.com/openlibrecommunity/olcrtc/internal/transport"
"github.com/openlibrecommunity/olcrtc/internal/transport/seichannel"
"github.com/openlibrecommunity/olcrtc/internal/transport/videochannel"
"github.com/openlibrecommunity/olcrtc/internal/transport/vp8channel"
"github.com/pion/webrtc/v4"
)
const (
testKeyHex = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff"
transportData = "datachannel"
transportVideo = "videochannel"
transportSEI = "seichannel"
transportVP8 = "vp8channel"
testRoom = "room"
localDNSServer = "127.0.0.1:53"
videoHWNone = "none"
testClientDeviceID = "client-1"
defaultJitsiRoomURL = "https://meet.cryptopro.ru/deadbeef"
)
var (
errRealE2ENotReady = errors.New("real e2e client did not become ready")
errTunnelDidNotStop = errors.New("tunnel goroutine did not stop")
errRealE2EEchoMismatch = errors.New("real e2e echo payload mismatch")
errSocksUnexpectedReply = errors.New("unexpected SOCKS5 reply")
errSocksUnexpectedHello = errors.New("unexpected SOCKS5 greeting")
errPayloadMismatchOffset = errors.New("payload mismatch at offset")
errFailoverCarrier = errors.New("intentional failover carrier failure")
)
var (
realE2E = flag.Bool( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-e2e",
false,
"run real provider e2e matrix against external WebRTC services",
)
realE2ECarriers = flag.String( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-carriers",
"jitsi,telemost,wbstream",
"comma-separated carriers for real e2e",
)
realE2ETransports = flag.String( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-transports",
"datachannel,videochannel,seichannel,vp8channel",
"comma-separated transports for real e2e",
)
realE2EJazzRoom = flag.String( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-jazz-room",
"",
"SaluteJazz room for real e2e, format roomID:password; autogenerated when empty",
)
realE2ETelemostRoom = flag.String( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-telemost-room",
"41514917109506",
"Telemost room URL or id for real e2e",
)
realE2EWBStreamRoom = flag.String( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-wbstream-room",
"019e23c2-a580-7550-b08a-7ac5342ca21f",
"WB Stream room id for real e2e; autogenerated when empty",
)
realE2EJitsiRoom = flag.String( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-jitsi-room",
defaultJitsiRoomURL,
"Jitsi Meet room URL for real e2e (format https://host/room or host/room); "+
"when left at default, a per-process random suffix is appended so concurrent "+
"test runs don't share a room",
)
realE2ETimeout = flag.Duration( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-timeout",
90*time.Second,
"timeout per real e2e provider/transport case",
)
)
type realE2EExpectation int
const (
realE2EExpectFail realE2EExpectation = iota
realE2EExpectPass
// realE2EExpectUnstable marks a carrier×transport combo that is
// known to flap: it sometimes succeeds and sometimes fails for
// reasons outside our control (third-party server load, lossy SFU
// paths, etc.). The matrix runner records the outcome but does
// not fail the test either way. Use this sparingly — prefer
// ExpectPass / ExpectFail when the behaviour is deterministic.
realE2EExpectUnstable
)
type memorySession struct {
stream *memoryStream
}
func (s *memorySession) Capabilities() carrier.Capabilities {
return carrier.Capabilities{ByteStream: true, VideoTrack: true}
}
func (s *memorySession) OpenByteStream() (carrier.ByteStream, error) {
return s.stream, nil
}
func (s *memorySession) OpenVideoTrack() (carrier.VideoTrack, error) {
return s.stream, nil
}
type memoryRoom struct {
mu sync.Mutex
streams map[*memoryStream]struct{}
}
func (r *memoryRoom) connectedCount() int {
r.mu.Lock()
defer r.mu.Unlock()
count := 0
for stream := range r.streams {
if stream.isConnected() {
count++
}
}
return count
}
func (r *memoryRoom) waitConnected(t *testing.T, want int) {
t.Helper()
deadline := time.Now().Add(3 * time.Second)
for time.Now().Before(deadline) {
if r.connectedCount() >= want {
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("memory room connected streams = %d, want at least %d", r.connectedCount(), want)
}
func (r *memoryRoom) triggerReconnect() {
r.mu.Lock()
streams := make([]*memoryStream, 0, len(r.streams))
for stream := range r.streams {
streams = append(streams, stream)
}
r.mu.Unlock()
var wg sync.WaitGroup
for _, stream := range streams {
wg.Add(1)
go func() {
defer wg.Done()
stream.triggerReconnect()
}()
}
wg.Wait()
}
func (r *memoryRoom) triggerEnded(reason string) {
r.mu.Lock()
streams := make([]*memoryStream, 0, len(r.streams))
for stream := range r.streams {
streams = append(streams, stream)
}
r.mu.Unlock()
for _, stream := range streams {
stream.triggerEnded(reason)
}
}
type memoryStream struct {
room *memoryRoom
onData func([]byte)
mu sync.Mutex
connected bool
closed bool
reconnect func()
ended func(string)
track webrtc.TrackLocal
trackCB func(*webrtc.TrackRemote, *webrtc.RTPReceiver)
pending [][]byte
}
func (s *memoryStream) Connect(context.Context) error {
s.mu.Lock()
s.connected = true
pending := s.pending
s.pending = nil
onData := s.onData
s.mu.Unlock()
for _, payload := range pending {
if onData != nil {
onData(payload)
}
}
return nil
}
func (s *memoryStream) Send(data []byte) error {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
return io.ErrClosedPipe
}
s.mu.Unlock()
payload := append([]byte(nil), data...)
s.room.mu.Lock()
peers := make([]*memoryStream, 0, len(s.room.streams))
for peer := range s.room.streams {
if peer != s {
peers = append(peers, peer)
}
}
s.room.mu.Unlock()
for _, peer := range peers {
peer.deliver(payload)
}
return nil
}
func (s *memoryStream) deliver(data []byte) {
s.mu.Lock()
if !s.connected && !s.closed {
s.pending = append(s.pending, append([]byte(nil), data...))
s.mu.Unlock()
return
}
ready := !s.closed && s.onData != nil
onData := s.onData
s.mu.Unlock()
if ready {
onData(append([]byte(nil), data...))
}
}
func (s *memoryStream) Close() error {
s.mu.Lock()
s.closed = true
s.connected = false
s.mu.Unlock()
return nil
}
func (s *memoryStream) SetReconnectCallback(cb func()) {
s.mu.Lock()
s.reconnect = cb
s.mu.Unlock()
}
func (s *memoryStream) SetShouldReconnect(func() bool) {}
func (s *memoryStream) SetEndedCallback(cb func(string)) {
s.mu.Lock()
s.ended = cb
s.mu.Unlock()
}
func (s *memoryStream) WatchConnection(ctx context.Context) {
<-ctx.Done()
}
func (s *memoryStream) CanSend() bool {
return s.isConnected()
}
func (s *memoryStream) AddTrack(track webrtc.TrackLocal) error {
s.mu.Lock()
s.track = track
s.mu.Unlock()
return nil
}
func (s *memoryStream) SetTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) {
s.mu.Lock()
s.trackCB = cb
s.mu.Unlock()
}
func (s *memoryStream) isConnected() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.connected && !s.closed
}
func (s *memoryStream) triggerReconnect() {
s.mu.Lock()
reconnect := s.reconnect
ready := s.connected && !s.closed && reconnect != nil
s.mu.Unlock()
if ready {
reconnect()
}
}
func (s *memoryStream) triggerEnded(reason string) {
s.mu.Lock()
ended := s.ended
ready := s.connected && !s.closed && ended != nil
s.mu.Unlock()
if ready {
ended(reason)
}
}
func registerMemoryCarrier(t *testing.T) (string, *memoryRoom) {
t.Helper()
session.RegisterDefaults()
name := "e2e-memory-" + t.Name()
room := &memoryRoom{streams: make(map[*memoryStream]struct{})}
carrier.Register(name, func(_ context.Context, cfg carrier.Config) (carrier.Session, error) {
stream := &memoryStream{room: room, onData: cfg.OnData}
room.mu.Lock()
room.streams[stream] = struct{}{}
room.mu.Unlock()
return &memorySession{stream: stream}, nil
})
return name, room
}
func registerMemoryCarrierAs(t *testing.T, name string) {
t.Helper()
room := &memoryRoom{streams: make(map[*memoryStream]struct{})}
carrier.Register(name, func(_ context.Context, cfg carrier.Config) (carrier.Session, error) {
stream := &memoryStream{room: room, onData: cfg.OnData}
room.mu.Lock()
room.streams[stream] = struct{}{}
room.mu.Unlock()
return &memorySession{stream: stream}, nil
})
}
func registerFailingCarrier(t *testing.T) string {
t.Helper()
session.RegisterDefaults()
name := "e2e-fail-" + t.Name()
carrier.Register(name, func(context.Context, carrier.Config) (carrier.Session, error) {
return nil, errFailoverCarrier
})
return name
}
func builtInCarrierNames() []string {
return []string{"jazz", "telemost", "wbstream", "jitsi"} //nolint:goconst // test literal, repetition is intentional
}
func builtInTransportNames() []string {
return []string{transportData, transportVideo, transportSEI, transportVP8}
}
//nolint:cyclop // matrix of carrier×transport expectations is naturally branchy
func realE2ECaseExpectation(carrierName, transportName string) realE2EExpectation {
switch carrierName {
case "telemost":
switch transportName {
case transportVP8:
return realE2EExpectPass
case transportVideo:
return realE2EExpectPass
default:
return realE2EExpectFail
}
case "wbstream":
if transportName == transportData {
return realE2EExpectFail
}
return realE2EExpectPass
case "jazz":
if transportName == transportData {
return realE2EExpectPass
}
return realE2EExpectFail
case "jitsi":
// Jitsi colibri-ws bridge channel maps cleanly onto the
// datachannel transport (raw bytes broadcast through
// EndpointMessage). Video transports go through pion's
// PeerConnection negotiated via Jingle session-accept.
//
// seichannel is marked Unstable: SEI NAL data piggybacks on
// the H.264 video stream, and Jicofo's bandwidth allocator
// for self-hosted Jitsi instances (e.g. meet.cryptopro.ru)
// periodically suppresses the video upstream when there's
// no obvious viewer demand, which manifests as recurring
// "seichannel ack timeout" against an otherwise healthy
// PeerConnection. The transport works in steady state but
// is not deterministic enough to gate CI on; flag it but
// don't fail the suite when it flaps.
if transportName == transportSEI {
return realE2EExpectUnstable
}
return realE2EExpectPass
default:
return realE2EExpectPass
}
}
func realE2EExpectationLabel(expectation realE2EExpectation) string {
switch expectation {
case realE2EExpectPass:
return "SUCCESS"
case realE2EExpectFail:
return "EXPECTED FAIL"
case realE2EExpectUnstable:
return "UNSTABLE"
default:
return "UNKNOWN"
}
}
// logUnstableOutcome records the result of an Unstable matrix entry
// without failing the test. Unstable combos exist to keep the matrix
// honest about transports that flap against a particular carrier
// (e.g. seichannel against meet.cryptopro.ru's bandwidth allocator)
// while still surfacing whether the run happened to pass or fail.
func logUnstableOutcome(t *testing.T, label, carrierName, transportName string, err error) {
t.Helper()
if err == nil {
t.Logf("%s PASS %s/%s", label, carrierName, transportName)
return
}
t.Logf("%s FAIL %s/%s: %v", label, carrierName, transportName, err)
}
func TestRealE2ECaseExpectation(t *testing.T) {
tests := []struct {
name string
carrier string
transport string
want realE2EExpectation
}{
{
name: "jazz datachannel is expected to pass",
carrier: "jazz",
transport: transportData,
want: realE2EExpectPass,
},
{
name: "jazz videochannel is expected to fail",
carrier: "jazz",
transport: transportVideo,
want: realE2EExpectFail,
},
{
name: "jazz seichannel is expected to fail",
carrier: "jazz",
transport: transportSEI,
want: realE2EExpectFail,
},
{
name: "jazz vp8channel is expected to fail",
carrier: "jazz",
transport: transportVP8,
want: realE2EExpectFail,
},
{
name: "telemost datachannel is expected to fail",
carrier: "telemost",
transport: transportData,
want: realE2EExpectFail,
},
{
name: "telemost vp8channel is expected to pass",
carrier: "telemost",
transport: transportVP8,
want: realE2EExpectPass,
},
{
name: "wbstream datachannel is expected to fail",
carrier: "wbstream",
transport: transportData,
want: realE2EExpectFail,
},
{
name: "jitsi datachannel is expected to pass",
carrier: "jitsi",
transport: transportData,
want: realE2EExpectPass,
},
{
name: "jitsi vp8channel is expected to pass",
carrier: "jitsi",
transport: transportVP8,
want: realE2EExpectPass,
},
{
name: "jitsi videochannel is expected to pass",
carrier: "jitsi",
transport: transportVideo,
want: realE2EExpectPass,
},
{
name: "jitsi seichannel is unstable",
carrier: "jitsi",
transport: transportSEI,
want: realE2EExpectUnstable,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := realE2ECaseExpectation(tt.carrier, tt.transport); got != tt.want {
t.Fatalf("realE2ECaseExpectation(%q, %q) = %v, want %v", tt.carrier, tt.transport, got, tt.want)
}
})
}
}
func splitTestList(value string) []string {
parts := strings.Split(value, ",")
items := make([]string, 0, len(parts))
for _, part := range parts {
part = strings.TrimSpace(part)
if part != "" {
items = append(items, part)
}
}
return items
}
//nolint:cyclop // table-driven test naturally has many branches
func realRoomURL(ctx context.Context, t *testing.T, carrierName string) string {
t.Helper()
switch carrierName {
case "jazz":
if *realE2EJazzRoom != "" {
return *realE2EJazzRoom
}
room, err := authSaluteJazz.Provider{}.CreateRoom(ctx, auth.Config{Name: "olcrtc-e2e-room"})
if err != nil {
t.Skipf("skip jazz real e2e: create room failed: %v", err)
}
return room
case "telemost":
room := *realE2ETelemostRoom
if room != "" && !strings.HasPrefix(room, "http://") && !strings.HasPrefix(room, "https://") {
room = "https://telemost.yandex.ru/j/" + room
}
return room
case "wbstream":
if *realE2EWBStreamRoom != "" {
return *realE2EWBStreamRoom
}
room, err := authWBStream.Provider{}.CreateRoom(ctx, auth.Config{Name: "olcrtc-e2e-room"})
if err != nil {
t.Skipf("skip wbstream real e2e: create room failed: %v", err)
}
return room
case "jitsi":
// Jitsi has no notion of "creating" a room — names are conjured
// on first join. The default flag points at meet.cryptopro.ru
// (a CryptoPro-operated public Jitsi instance). When the flag is
// left at its default value, a per-process random suffix is appended
// to the slug: two participants share a single room by design (one
// pair, one shared key), so any third participant — including another
// concurrent test process with the same shared key — would corrupt
// the wire protocol on both sides. Users overriding the flag are
// trusted to manage room uniqueness themselves.
_ = ctx
room := *realE2EJitsiRoom
if room == "" {
t.Skip("skip jitsi real e2e: empty -olcrtc.real-jitsi-room")
}
if room == defaultJitsiRoomURL {
room = defaultJitsiRoomWithSuffix()
}
return room
default:
return ""
}
}
var (
jitsiRoomOnce sync.Once //nolint:gochecknoglobals // per-process suffix cache
jitsiRoomURL string //nolint:gochecknoglobals // per-process suffix cache
)
// defaultJitsiRoomWithSuffix returns the default Jitsi room URL with a random
// 8-hex-char suffix appended to the slug. Computed once per test process and
// cached so all sub-tests (server + client) land in the same MUC.
func defaultJitsiRoomWithSuffix() string {
jitsiRoomOnce.Do(func() {
var b [4]byte
if _, err := rand.Read(b[:]); err != nil {
// crypto/rand failing on a healthy host is exceptional; fall back
// to PID to keep tests usable rather than blowing up here.
jitsiRoomURL = fmt.Sprintf("%s-%d", defaultJitsiRoomURL, os.Getpid())
return
}
jitsiRoomURL = defaultJitsiRoomURL + "-" + hex.EncodeToString(b[:])
})
return jitsiRoomURL
}
func requireRealRoom(ctx context.Context, t *testing.T, carrierName string) string {
t.Helper()
roomURL := realRoomURL(ctx, t, carrierName)
if roomURL == "" {
t.Fatalf("missing room for %s", carrierName)
}
return roomURL
}
func validSessionConfig(mode, carrierName, transportName string) session.Config {
return session.Config{
Mode: mode,
Transport: transportName,
Auth: carrierName,
RoomID: testRoom,
KeyHex: testKeyHex,
SOCKSHost: "127.0.0.1",
SOCKSPort: 1080,
DNSServer: localDNSServer,
VideoWidth: 1080,
VideoHeight: 1080,
VideoFPS: 30,
VideoBitrate: "1M",
VideoHW: videoHWNone,
VideoCodec: "tile",
VideoTileModule: 4,
VideoTileRS: 20,
VP8FPS: 60,
VP8BatchSize: 8,
SEIFPS: 30,
SEIBatchSize: 4,
SEIFragmentSize: 512,
SEIAckTimeoutMS: 1500,
}
}
// e2eTransportOptions builds the per-transport options bundle the e2e tests
// pass into server.Config / client.Config. Values mirror the documented
// validSessionConfig defaults so server and client end up agreeing on the
// transport tuning.
func e2eTransportOptions(transportName string) transport.Options {
switch transportName {
case "videochannel":
return videochannel.Options{
Width: 1080,
Height: 1080,
FPS: 60,
Bitrate: "5000k",
HW: videoHWNone,
QRSize: 512,
QRRecovery: "low",
Codec: "qrcode",
TileModule: 4,
TileRS: 20,
}
case "vp8channel":
return vp8channel.Options{FPS: 60, BatchSize: 8}
case "seichannel":
return seichannel.Options{FPS: 30, BatchSize: 4, FragmentSize: 512, AckTimeoutMS: 1500}
}
return nil
}
func validTransportConfig(carrierName, transportName string) transport.Config {
cfg := validSessionConfig("cnc", carrierName, transportName)
return transport.Config{
Carrier: cfg.Auth,
RoomURL: testRoom,
DeviceID: "e2e-link-test",
Name: "e2e-" + carrierName + "-" + transportName,
DNSServer: cfg.DNSServer,
Options: e2eTransportOptions(transportName),
}
}
func startEchoServer(t *testing.T) string {
t.Helper()
var lc net.ListenConfig
ln, err := lc.Listen(context.Background(), "tcp4", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen echo: %v", err)
}
t.Cleanup(func() { _ = ln.Close() })
go func() {
for {
conn, err := ln.Accept()
if err != nil {
return
}
go func() {
defer func() { _ = conn.Close() }()
_, _ = io.Copy(conn, conn)
}()
}
}()
return ln.Addr().String()
}
func freeLocalAddr(ctx context.Context, t *testing.T) string {
t.Helper()
var lc net.ListenConfig
ln, err := lc.Listen(ctx, "tcp4", "127.0.0.1:0")
if err != nil {
t.Fatalf("reserve local addr: %v", err)
}
addr := ln.Addr().String()
if err := ln.Close(); err != nil {
t.Fatalf("close reserved addr: %v", err)
}
return addr
}
func waitForReady(t *testing.T, ready <-chan struct{}) {
t.Helper()
select {
case <-ready:
case <-time.After(3 * time.Second):
t.Fatal("client did not become ready")
}
}
type tunnelRuntime struct {
socksAddr string
room *memoryRoom
cancel context.CancelFunc
serverErr chan error
clientErr chan error
stopWait time.Duration
}
func startTunnel(t *testing.T) *tunnelRuntime {
t.Helper()
carrierName, room := registerMemoryCarrier(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
socksAddr := freeLocalAddr(ctx, t)
serverErr := make(chan error, 1)
go func() {
serverErr <- server.Run(ctx, server.Config{
Transport: transportData,
Carrier: carrierName,
RoomURL: testRoom,
KeyHex: testKeyHex,
DNSServer: localDNSServer,
})
}()
room.waitConnected(t, 1)
ready := make(chan struct{})
clientErr := make(chan error, 1)
go func() {
clientErr <- client.RunWithReady(ctx, client.Config{
Transport: transportData,
Carrier: carrierName,
RoomURL: testRoom,
KeyHex: testKeyHex,
DeviceID: testClientDeviceID,
LocalAddr: socksAddr,
DNSServer: localDNSServer,
}, func() { close(ready) })
}()
waitForReady(t, ready)
return &tunnelRuntime{
socksAddr: socksAddr,
room: room,
cancel: cancel,
serverErr: serverErr,
clientErr: clientErr,
stopWait: 3 * time.Second,
}
}
func startRealTunnel(
ctx context.Context,
t *testing.T,
carrierName, transportName, roomURL, _, clientDeviceID string,
) (*tunnelRuntime, error) {
t.Helper()
session.RegisterDefaults()
socksAddr := freeLocalAddr(ctx, t)
channelID := fmt.Sprintf("e2e-%d-%d", os.Getpid(), time.Now().UnixNano())
runCtx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)
serverErr := make(chan error, 1)
go func() {
serverErr <- server.Run(runCtx, server.Config{
Transport: transportName,
Carrier: carrierName,
RoomURL: roomURL,
ChannelID: channelID,
KeyHex: testKeyHex,
DNSServer: localDNSServer,
TransportOptions: e2eTransportOptions(transportName),
})
}()
select {
case err := <-serverErr:
cancel()
return nil, fmt.Errorf("server exited before client start: %w", err)
case <-time.After(2 * time.Second):
case <-runCtx.Done():
cancel()
return nil, fmt.Errorf("server context ended before client start: %w", runCtx.Err())
}
ready := make(chan struct{})
clientErr := make(chan error, 1)
go func() {
clientErr <- client.RunWithReady(runCtx, client.Config{
Transport: transportName,
Carrier: carrierName,
RoomURL: roomURL,
ChannelID: channelID,
KeyHex: testKeyHex,
DeviceID: clientDeviceID,
LocalAddr: socksAddr,
DNSServer: localDNSServer,
TransportOptions: e2eTransportOptions(transportName),
}, func() { close(ready) })
}()
select {
case <-ready:
case err := <-clientErr:
cancel()
return nil, fmt.Errorf("client exited before ready: %w", err)
case err := <-serverErr:
cancel()
return nil, fmt.Errorf("server exited before client ready: %w", err)
case <-time.After(*realE2ETimeout):
cancel()
return nil, errRealE2ENotReady
case <-runCtx.Done():
cancel()
return nil, fmt.Errorf("real e2e context ended before ready: %w", runCtx.Err())
}
return &tunnelRuntime{
socksAddr: socksAddr,
cancel: cancel,
serverErr: serverErr,
clientErr: clientErr,
stopWait: 20 * time.Second,
}, nil
}
func (r *tunnelRuntime) stop(t *testing.T) {
t.Helper()
if err := r.stopErr(); err != nil {
t.Fatal(err)
}
}
func (r *tunnelRuntime) waitStopped(t *testing.T) {
t.Helper()
if err := r.waitStoppedErr(); err != nil {
t.Fatal(err)
}
}
func (r *tunnelRuntime) stopErr() error {
r.cancel()
return r.waitStoppedErr()
}
func (r *tunnelRuntime) waitStoppedErr() error {
stopWait := r.stopWait
if stopWait <= 0 {
stopWait = 3 * time.Second
}
for _, item := range []struct {
name string
ch <-chan error
}{
{name: "client", ch: r.clientErr},
{name: "server", ch: r.serverErr},
} {
select {
case err := <-item.ch:
if err != nil {
return fmt.Errorf("%s returned error: %w", item.name, err)
}
case <-time.After(stopWait):
return fmt.Errorf("%w: %s", errTunnelDidNotStop, item.name)
}
}
return nil
}
func connectViaSOCKS(t *testing.T, socksAddr, targetAddr string) net.Conn {
t.Helper()
dialer := net.Dialer{Timeout: 2 * time.Second}
conn, err := dialer.DialContext(context.Background(), "tcp4", socksAddr)
if err != nil {
t.Fatalf("dial socks: %v", err)
}
if _, err := conn.Write([]byte{5, 1, 0}); err != nil {
_ = conn.Close()
t.Fatalf("write socks greeting: %v", err)
}
greeting := make([]byte, 2)
if _, err := io.ReadFull(conn, greeting); err != nil {
_ = conn.Close()
t.Fatalf("read socks greeting: %v", err)
}
if !bytes.Equal(greeting, []byte{5, 0}) {
_ = conn.Close()
t.Fatalf("socks greeting = %v, want [5 0]", greeting)
}
host, portText, err := net.SplitHostPort(targetAddr)
if err != nil {
_ = conn.Close()
t.Fatalf("split target addr: %v", err)
}
port, err := strconv.Atoi(portText)
if err != nil {
_ = conn.Close()
t.Fatalf("parse target port: %v", err)
}
req := make([]byte, 0, 10)
req = append(req, 5, 1, 0, 1)
req = append(req, net.ParseIP(host).To4()...)
var portBuf [2]byte
binary.BigEndian.PutUint16(portBuf[:], uint16(port)) //nolint:gosec // SOCKS5 port is uint16 by definition
req = append(req, portBuf[:]...)
if _, err := conn.Write(req); err != nil {
_ = conn.Close()
t.Fatalf("write socks connect: %v", err)
}
reply := make([]byte, 10)
if _, err := io.ReadFull(conn, reply); err != nil {
_ = conn.Close()
t.Fatalf("read socks connect reply: %v", err)
}
if !bytes.Equal(reply, []byte{5, 0, 0, 1, 0, 0, 0, 0, 0, 0}) {
_ = conn.Close()
t.Fatalf("socks reply = %v, want success", reply)
}
return conn
}
func TestBuiltInProviderTransportMatrixValidates(t *testing.T) {
session.RegisterDefaults()
for _, mode := range []string{"srv", "cnc"} {
t.Run(mode, func(t *testing.T) {
for _, carrierName := range builtInCarrierNames() {
t.Run(carrierName, func(t *testing.T) {
for _, transportName := range builtInTransportNames() {
t.Run(transportName, func(t *testing.T) {
cfg := validSessionConfig(mode, carrierName, transportName)
if err := session.Validate(cfg); err != nil {
t.Fatalf("Validate() error = %v", err)
}
})
}
})
}
})
}
}
func TestTransportCreatesAllProviderTransportCombinations(t *testing.T) {
session.RegisterDefaults()
for _, carrierName := range builtInCarrierNames() {
registerMemoryCarrierAs(t, carrierName)
}
for _, carrierName := range builtInCarrierNames() {
t.Run(carrierName, func(t *testing.T) {
for _, transportName := range builtInTransportNames() {
t.Run(transportName, func(t *testing.T) {
tr, err := transport.New(context.Background(), transportName, validTransportConfig(carrierName, transportName))
if err != nil {
t.Fatalf("transport.New() error = %v", err)
}
if err := tr.Close(); err != nil {
t.Fatalf("Close() error = %v", err)
}
})
}
})
}
}
func TestTransportConnectsFastProviderTransportMatrix(t *testing.T) {
session.RegisterDefaults()
for _, carrierName := range builtInCarrierNames() {
registerMemoryCarrierAs(t, carrierName)
}
for _, carrierName := range builtInCarrierNames() {
t.Run(carrierName, func(t *testing.T) {
for _, transportName := range []string{transportData, transportSEI} {
t.Run(transportName, func(t *testing.T) {
tr, err := transport.New(context.Background(), transportName, validTransportConfig(carrierName, transportName))
if err != nil {
t.Fatalf("transport.New() error = %v", err)
}
if err := tr.Connect(context.Background()); err != nil {
t.Fatalf("Connect() error = %v", err)
}
assertTransportCanSendAfterConnect(t, tr, transportName)
if err := tr.Close(); err != nil {
t.Fatalf("Close() error = %v", err)
}
})
}
})
}
}
func assertTransportCanSendAfterConnect(t *testing.T, tr transport.Transport, transportName string) {
t.Helper()
if transportName == transportSEI {
if tr.CanSend() {
t.Fatal("CanSend() = true before peer seichannel frame")
}
return
}
if !tr.CanSend() {
t.Fatal("CanSend() = false, want true")
}
}
//nolint:cyclop // table-driven test naturally has many branches
func TestRealProviderTransportMatrix(t *testing.T) {
if !*realE2E {
t.Skip("real provider e2e disabled; pass -olcrtc.real-e2e with provider room flags")
}
carriers := splitTestList(*realE2ECarriers)
transports := splitTestList(*realE2ETransports)
if len(carriers) == 0 {
t.Fatal("no real e2e carriers selected")
}
if len(transports) == 0 {
t.Fatal("no real e2e transports selected")
}
echoAddr := startEchoServer(t)
for _, carrierName := range carriers {
t.Run(carrierName, func(t *testing.T) {
roomCtx, cancelRoom := context.WithTimeout(context.Background(), *realE2ETimeout)
defer cancelRoom()
roomURL := requireRealRoom(roomCtx, t, carrierName)
var authFailed bool
for _, transportName := range transports {
t.Run(transportName, func(t *testing.T) {
if authFailed {
t.Skip("skipping: carrier auth failed on previous transport")
}
expectation := realE2ECaseExpectation(carrierName, transportName)
label := realE2EExpectationLabel(expectation)
err := runRealE2ECase(t, carrierName, transportName, roomURL, echoAddr)
if err != nil && errors.Is(err, carrier.ErrAuthFailed) {
authFailed = true
t.Skipf("skip %s real e2e: auth failed: %v", carrierName, err)
}
switch {
case err == nil && expectation == realE2EExpectPass:
t.Logf("%s %s/%s", label, carrierName, transportName)
case err == nil && expectation == realE2EExpectFail:
t.Fatalf("UNEXPECTED SUCCESS %s/%s", carrierName, transportName)
case err != nil && expectation == realE2EExpectPass:
t.Fatalf("EXPECTED SUCCESS %s/%s failed: %v", carrierName, transportName, err)
case err != nil && expectation == realE2EExpectFail:
t.Logf("%s %s/%s: %v", label, carrierName, transportName, err)
case expectation == realE2EExpectUnstable:
logUnstableOutcome(t, label, carrierName, transportName, err)
}
})
}
})
}
}
func runRealE2ECase(t *testing.T, carrierName, transportName, roomURL, echoAddr string) (err error) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), *realE2ETimeout)
defer cancel()
rt, err := startRealTunnel(ctx, t, carrierName, transportName, roomURL, testClientDeviceID, testClientDeviceID)
if err != nil {
return err
}
defer func() {
if stopErr := rt.stopErr(); err == nil && stopErr != nil {
err = stopErr
}
}()
conn, err := connectViaSOCKSWithin(rt.socksAddr, echoAddr, *realE2ETimeout)
if err != nil {
return err
}
defer func() { _ = conn.Close() }()
payload := []byte("olcrtc-real-e2e-" + carrierName + "-" + transportName + "\n")
if _, err := conn.Write(payload); err != nil {
return fmt.Errorf("write real e2e payload: %w", err)
}
if err := conn.SetReadDeadline(time.Now().Add(*realE2ETimeout)); err != nil {
return fmt.Errorf("set real e2e read deadline: %w", err)
}
line, err := bufio.NewReader(conn).ReadBytes('\n')
if err != nil {
return fmt.Errorf("read real e2e echo: %w", err)
}
if !bytes.Equal(line, payload) {
return fmt.Errorf("%w: got %q, want %q", errRealE2EEchoMismatch, line, payload)
}
return nil
}
func TestClientServerSOCKSTunnelOverMemoryDatachannel(t *testing.T) {
echoAddr := startEchoServer(t)
rt := startTunnel(t)
defer rt.stop(t)
conn := connectViaSOCKS(t, rt.socksAddr, echoAddr)
defer func() { _ = conn.Close() }()
payload := []byte("olcrtc-e2e-payload\n")
if _, err := conn.Write(payload); err != nil {
t.Fatalf("write tunneled payload: %v", err)
}
if err := conn.SetReadDeadline(time.Now().Add(3 * time.Second)); err != nil {
t.Fatalf("set read deadline: %v", err)
}
line, err := bufio.NewReader(conn).ReadBytes('\n')
if err != nil {
t.Fatalf("read tunneled echo: %v", err)
}
if !bytes.Equal(line, payload) {
t.Fatalf("echo = %q, want %q", line, payload)
}
}
func TestFrequentReconnectsStillAllowNewSOCKSConnections(t *testing.T) {
echoAddr := startEchoServer(t)
rt := startTunnel(t)
defer rt.stop(t)
for i := range 5 {
rt.room.triggerReconnect()
conn := eventuallyConnectViaSOCKS(t, rt.socksAddr, echoAddr)
payload := fmt.Appendf(nil, "after-reconnect-%d\n", i)
if _, err := conn.Write(payload); err != nil {
_ = conn.Close()
t.Fatalf("write after reconnect %d: %v", i, err)
}
if err := conn.SetReadDeadline(time.Now().Add(3 * time.Second)); err != nil {
_ = conn.Close()
t.Fatalf("set deadline after reconnect %d: %v", i, err)
}
line, err := bufio.NewReader(conn).ReadBytes('\n')
_ = conn.Close()
if err != nil {
t.Fatalf("read after reconnect %d: %v", i, err)
}
if !bytes.Equal(line, payload) {
t.Fatalf("echo after reconnect %d = %q, want %q", i, line, payload)
}
}
}
func TestSupervisorFailoverProfilesReachWorkingSOCKS(t *testing.T) {
echoAddr := startEchoServer(t)
failingCarrier := registerFailingCarrier(t)
memoryCarrier, room := registerMemoryCarrier(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
socksAddr := freeLocalAddr(ctx, t)
socksHost, socksPort := splitHostPort(t, socksAddr)
serverProfiles := []supervisor.Profile{
{Name: "failing-server", Config: failoverSessionConfig("srv", failingCarrier, "", 0)},
{Name: "memory-server", Config: failoverSessionConfig("srv", memoryCarrier, "", 0)},
}
clientProfiles := []supervisor.Profile{
{Name: "failing-client", Config: failoverSessionConfig("cnc", failingCarrier, socksHost, socksPort)},
{Name: "memory-client", Config: failoverSessionConfig("cnc", memoryCarrier, socksHost, socksPort)},
}
started := make(chan string, 8)
serverErr := make(chan error, 1)
go func() {
serverErr <- supervisor.Run(ctx, failoverE2EConfig(serverProfiles, started, "server"), session.Run)
}()
room.waitConnected(t, 1)
ready := make(chan struct{})
var readyOnce sync.Once
clientErr := make(chan error, 1)
go func() {
runClientProfile := func(ctx context.Context, cfg session.Config) error {
return client.RunWithReady(ctx, clientConfigFromSession(cfg, socksAddr), func() {
if cfg.Auth == memoryCarrier {
readyOnce.Do(func() { close(ready) })
}
})
}
clientErr <- supervisor.Run(ctx, failoverE2EConfig(clientProfiles, started, "client"), runClientProfile)
}()
waitForReady(t, ready)
conn := eventuallyConnectViaSOCKS(t, socksAddr, echoAddr)
defer func() { _ = conn.Close() }()
payload := []byte("olcrtc-failover-e2e\n")
if _, err := conn.Write(payload); err != nil {
t.Fatalf("write failover payload: %v", err)
}
if err := conn.SetReadDeadline(time.Now().Add(3 * time.Second)); err != nil {
t.Fatalf("set failover read deadline: %v", err)
}
line, err := bufio.NewReader(conn).ReadBytes('\n')
if err != nil {
t.Fatalf("read failover echo: %v", err)
}
if !bytes.Equal(line, payload) {
t.Fatalf("failover echo = %q, want %q", line, payload)
}
requireStartedProfiles(t, started, []string{
"server:failing-server",
"server:memory-server",
"client:failing-client",
"client:memory-client",
})
cancel()
waitSupervisorStopped(t, "client", clientErr)
waitSupervisorStopped(t, "server", serverErr)
}
func failoverSessionConfig(mode, carrierName, socksHost string, socksPort int) session.Config {
cfg := session.Config{
Mode: mode,
Transport: transportData,
Auth: carrierName,
RoomID: testRoom,
KeyHex: testKeyHex,
DNSServer: localDNSServer,
}
if mode == "cnc" {
cfg.SOCKSHost = socksHost
cfg.SOCKSPort = socksPort
}
return cfg
}
func clientConfigFromSession(cfg session.Config, socksAddr string) client.Config {
return client.Config{
Transport: cfg.Transport,
Carrier: cfg.Auth,
RoomURL: cfg.RoomID,
KeyHex: cfg.KeyHex,
LocalAddr: socksAddr,
DNSServer: cfg.DNSServer,
DeviceID: testClientDeviceID,
TransportOptions: e2eTransportOptions(cfg.Transport),
Engine: cfg.Engine,
URL: cfg.URL,
Token: cfg.Token,
}
}
func failoverE2EConfig(
profiles []supervisor.Profile,
started chan<- string,
side string,
) supervisor.Config {
return supervisor.Config{
Profiles: profiles,
RetryDelay: time.Millisecond,
OnProfileStart: func(profile supervisor.Profile, _ int) {
select {
case started <- side + ":" + profile.Name:
default:
}
},
}
}
func splitHostPort(t *testing.T, addr string) (string, int) {
t.Helper()
host, portText, err := net.SplitHostPort(addr)
if err != nil {
t.Fatalf("split host port %q: %v", addr, err)
}
port, err := strconv.Atoi(portText)
if err != nil {
t.Fatalf("parse port %q: %v", portText, err)
}
return host, port
}
func requireStartedProfiles(t *testing.T, started <-chan string, want []string) {
t.Helper()
seen := make(map[string]bool)
deadline := time.After(3 * time.Second)
for len(seen) < len(want) {
select {
case item := <-started:
seen[item] = true
case <-deadline:
t.Fatalf("started profiles = %v, want all %v", seen, want)
}
}
for _, item := range want {
if !seen[item] {
t.Fatalf("started profiles = %v, missing %s", seen, item)
}
}
}
func waitSupervisorStopped(t *testing.T, name string, ch <-chan error) {
t.Helper()
select {
case err := <-ch:
if err != nil {
t.Fatalf("%s supervisor returned error: %v", name, err)
}
case <-time.After(3 * time.Second):
t.Fatalf("%s supervisor did not stop", name)
}
}
func TestEndedCallbackStopsClientAndServer(t *testing.T) {
rt := startTunnel(t)
rt.room.triggerEnded("conference ended")
rt.waitStopped(t)
}
func eventuallyConnectViaSOCKS(t *testing.T, socksAddr, targetAddr string) net.Conn {
t.Helper()
return eventuallyConnectViaSOCKSWithin(t, socksAddr, targetAddr, 3*time.Second)
}
func eventuallyConnectViaSOCKSWithin(t *testing.T, socksAddr, targetAddr string, timeout time.Duration) net.Conn {
t.Helper()
conn, err := connectViaSOCKSWithin(socksAddr, targetAddr, timeout)
if err != nil {
t.Fatal(err)
}
return conn
}
func connectViaSOCKSWithin(socksAddr, targetAddr string, timeout time.Duration) (net.Conn, error) {
deadline := time.Now().Add(timeout)
var lastErr error
attempt := 0
for time.Now().Before(deadline) {
conn, err := tryConnectViaSOCKS(socksAddr, targetAddr)
if err == nil {
return conn, nil
}
lastErr = err
attempt++
sleep := 250 * time.Millisecond
if attempt > 3 {
sleep = time.Second
}
time.Sleep(sleep)
}
return nil, fmt.Errorf("connect via SOCKS failed after %s: %w", timeout, lastErr)
}
func tryConnectViaSOCKS(socksAddr, targetAddr string) (net.Conn, error) {
dialer := net.Dialer{Timeout: 500 * time.Millisecond}
conn, err := dialer.DialContext(context.Background(), "tcp4", socksAddr)
if err != nil {
return nil, fmt.Errorf("dial socks: %w", err)
}
if _, err := conn.Write([]byte{5, 1, 0}); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("write greeting: %w", err)
}
greeting := make([]byte, 2)
if _, err := io.ReadFull(conn, greeting); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("read greeting: %w", err)
}
if !bytes.Equal(greeting, []byte{5, 0}) {
_ = conn.Close()
return nil, fmt.Errorf("%w: %v", errSocksUnexpectedHello, greeting)
}
host, portText, err := net.SplitHostPort(targetAddr)
if err != nil {
_ = conn.Close()
return nil, fmt.Errorf("split host port: %w", err)
}
port, err := strconv.Atoi(portText)
if err != nil {
_ = conn.Close()
return nil, fmt.Errorf("parse port: %w", err)
}
req := make([]byte, 0, 10)
req = append(req, 5, 1, 0, 1)
req = append(req, net.ParseIP(host).To4()...)
var portBuf [2]byte
binary.BigEndian.PutUint16(portBuf[:], uint16(port)) //nolint:gosec // SOCKS5 port is uint16 by definition
req = append(req, portBuf[:]...)
if _, err := conn.Write(req); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("write connect request: %w", err)
}
reply := make([]byte, 10)
if _, err := io.ReadFull(conn, reply); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("read connect reply: %w", err)
}
if !bytes.Equal(reply, []byte{5, 0, 0, 1, 0, 0, 0, 0, 0, 0}) {
_ = conn.Close()
return nil, fmt.Errorf("%w: %v", errSocksUnexpectedReply, reply)
}
return conn, nil
}
func TestLargeTransferOverTunnel(t *testing.T) {
echoAddr := startEchoServer(t)
rt := startTunnel(t)
defer rt.stop(t)
size := int64(32 << 20)
conn := connectViaSOCKS(t, rt.socksAddr, echoAddr)
defer func() { _ = conn.Close() }()
if err := streamPatternAndVerifyEcho(conn, size); err != nil {
t.Fatalf("large transfer %d bytes failed: %v", size, err)
}
}
func streamPatternAndVerifyEcho(conn net.Conn, size int64) error {
errCh := make(chan error, 1)
go func() {
buf := make([]byte, 32*1024)
var written int64
for written < size {
n := len(buf)
if remaining := size - written; remaining < int64(n) {
n = int(remaining)
}
fillPattern(buf[:n], written)
if _, err := conn.Write(buf[:n]); err != nil {
errCh <- fmt.Errorf("write at %d: %w", written, err)
return
}
written += int64(n)
}
errCh <- nil
}()
buf := make([]byte, 32*1024)
want := make([]byte, len(buf))
var read int64
for read < size {
n := len(buf)
if remaining := size - read; remaining < int64(n) {
n = int(remaining)
}
if err := conn.SetReadDeadline(time.Now().Add(10 * time.Second)); err != nil {
return fmt.Errorf("set read deadline: %w", err)
}
if _, err := io.ReadFull(conn, buf[:n]); err != nil {
return fmt.Errorf("read at %d: %w", read, err)
}
fillPattern(want[:n], read)
if !bytes.Equal(buf[:n], want[:n]) {
return fmt.Errorf("%w %d", errPayloadMismatchOffset, read)
}
read += int64(n)
}
if err := <-errCh; err != nil {
return err
}
return nil
}
func fillPattern(buf []byte, offset int64) {
for i := range buf {
buf[i] = byte((offset + int64(i)*31 + 7) & 0xff)
}
}