Files
olcrtc/internal/e2e/tunnel_test.go

1782 lines
49 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package e2e
import (
"bufio"
"bytes"
"context"
"crypto/rand"
"encoding/binary"
"encoding/hex"
"errors"
"flag"
"fmt"
"io"
"net"
"os"
"strconv"
"strings"
"sync"
"testing"
"time"
"github.com/openlibrecommunity/olcrtc/internal/app/session"
"github.com/openlibrecommunity/olcrtc/internal/client"
"github.com/openlibrecommunity/olcrtc/internal/engine"
enginebuiltin "github.com/openlibrecommunity/olcrtc/internal/engine/builtin"
"github.com/openlibrecommunity/olcrtc/internal/server"
"github.com/openlibrecommunity/olcrtc/internal/supervisor"
"github.com/openlibrecommunity/olcrtc/internal/transport"
"github.com/openlibrecommunity/olcrtc/internal/transport/seichannel"
"github.com/openlibrecommunity/olcrtc/internal/transport/videochannel"
"github.com/openlibrecommunity/olcrtc/internal/transport/vp8channel"
pioninterceptor "github.com/pion/interceptor"
"github.com/pion/webrtc/v4"
)
const (
testKeyHex = "00112233445566778899aabbccddeeff00112233445566778899aabbccddeeff"
transportData = "datachannel"
transportVideo = "videochannel"
transportSEI = "seichannel"
transportVP8 = "vp8channel"
testRoom = "room"
localDNSServer = "127.0.0.1:53"
videoHWNone = "none"
testClientDeviceID = "client-1"
defaultJitsiRoomURL = "https://meet1.arbitr.ru/deadbeef"
)
var (
errRealE2ENotReady = errors.New("real e2e client did not become ready")
errTunnelDidNotStop = errors.New("tunnel goroutine did not stop")
errRealE2EEchoMismatch = errors.New("real e2e echo payload mismatch")
errSocksUnexpectedReply = errors.New("unexpected SOCKS5 reply")
errSocksUnexpectedHello = errors.New("unexpected SOCKS5 greeting")
errPayloadMismatchOffset = errors.New("payload mismatch at offset")
errFailoverCarrier = errors.New("intentional failover carrier failure")
errMemoryLoopbackPCClosed = errors.New("memory loopback: PC closed during SDP exchange")
errServerExitedBeforeClientStart = errors.New("server exited cleanly before client start")
errClientExitedBeforeReady = errors.New("client exited cleanly before ready")
errServerExitedBeforeClientReady = errors.New("server exited cleanly before client ready")
)
var (
realE2E = flag.Bool( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-e2e",
false,
"run real provider e2e matrix against external WebRTC services",
)
realE2ECarriers = flag.String( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-carriers",
"jitsi,telemost,wbstream",
"comma-separated carriers for real e2e",
)
realE2ETransports = flag.String( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-transports",
"datachannel,videochannel,seichannel,vp8channel",
"comma-separated transports for real e2e",
)
realE2ETelemostRoom = flag.String( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-telemost-room",
"41514917109506",
"Telemost room URL or id for real e2e",
)
realE2EWBStreamRoom = flag.String( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-wbstream-room",
"019e23c2-a580-7550-b08a-7ac5342ca21f",
"WB Stream room id for real e2e; autogenerated when empty",
)
realE2EJitsiRoom = flag.String( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-jitsi-room",
defaultJitsiRoomURL,
"Jitsi Meet room URL for real e2e (format https://host/room or host/room); "+
"when left at default, a per-process random suffix is appended so concurrent "+
"test runs don't share a room",
)
realE2ETimeout = flag.Duration( //nolint:gochecknoglobals // package-level state intentional
"olcrtc.real-timeout",
90*time.Second,
"timeout per real e2e provider/transport case",
)
)
type realE2EExpectation int
const (
realE2EExpectFail realE2EExpectation = iota
realE2EExpectPass
// realE2EExpectUnstable marks a carrier×transport combo that is
// known to flap: it sometimes succeeds and sometimes fails for
// reasons outside our control (third-party server load, lossy SFU
// paths, etc.). The matrix runner records the outcome but does
// not fail the test either way. Use this sparingly - prefer
// ExpectPass / ExpectFail when the behaviour is deterministic.
realE2EExpectUnstable
)
// memoryStream is registered as an engine.Session directly: it implements
// every Session method plus engine.VideoTrackCapable (AddVideoTrack /
// SetVideoTrackHandler aliases below). The wrapper that used to live in
// memorySession is no longer needed after the carrier-layer collapse.
type memoryRoom struct {
mu sync.Mutex
streams map[*memoryStream]struct{}
}
func (r *memoryRoom) connectedCount() int {
r.mu.Lock()
defer r.mu.Unlock()
count := 0
for stream := range r.streams {
if stream.isConnected() {
count++
}
}
return count
}
func (r *memoryRoom) waitConnected(t *testing.T, want int) {
t.Helper()
deadline := time.Now().Add(3 * time.Second)
for time.Now().Before(deadline) {
if r.connectedCount() >= want {
return
}
time.Sleep(10 * time.Millisecond)
}
t.Fatalf("memory room connected streams = %d, want at least %d", r.connectedCount(), want)
}
func (r *memoryRoom) triggerReconnect() {
r.mu.Lock()
streams := make([]*memoryStream, 0, len(r.streams))
for stream := range r.streams {
streams = append(streams, stream)
}
r.mu.Unlock()
var wg sync.WaitGroup
for _, stream := range streams {
wg.Add(1)
go func() {
defer wg.Done()
stream.triggerReconnect()
}()
}
wg.Wait()
}
func (r *memoryRoom) triggerEnded(reason string) {
r.mu.Lock()
streams := make([]*memoryStream, 0, len(r.streams))
for stream := range r.streams {
streams = append(streams, stream)
}
r.mu.Unlock()
for _, stream := range streams {
stream.triggerEnded(reason)
}
}
// peerOf returns the other stream in a 2-party room or nil if there is
// no peer (yet). Video loopback relies on a single 1:1 partner so we
// just pick the first non-self stream we see.
func (r *memoryRoom) peerOf(s *memoryStream) *memoryStream {
r.mu.Lock()
defer r.mu.Unlock()
for stream := range r.streams {
if stream != s {
return stream
}
}
return nil
}
// isFirstStream reports whether s is the only stream currently in the
// room. Used to deterministically pick the SDP offerer: the first
// stream to register builds its PC first and becomes the offerer.
func (r *memoryRoom) isFirstStream(s *memoryStream) bool {
r.mu.Lock()
defer r.mu.Unlock()
if len(r.streams) == 0 {
return true
}
if _, ok := r.streams[s]; ok && len(r.streams) == 1 {
return true
}
return false
}
type memoryStream struct {
room *memoryRoom
onData func([]byte)
mu sync.Mutex
connected bool
closed bool
reconnect func()
ended func(string)
track webrtc.TrackLocal
trackCB func(*webrtc.TrackRemote, *webrtc.RTPReceiver)
pending [][]byte
// Video-track plumbing. Until AddVideoTrack or
// SetVideoTrackHandler is called the embedded *webrtc.PeerConnection
// stays nil and memoryStream behaves as the original byte-only
// carrier. Once a video transport touches us we lazily build a real
// pion PC; SDP/ICE exchange with the peer stream is plumbed through
// the parent room, so two streams in the same room loopback their
// video tracks through a real (in-process) WebRTC stack.
//
// streamCtx is owned by the stream itself (cancelled in Close), not
// by the short-lived ctx that Connect receives - the video
// transport's connectCtx fires its deferred cancel as soon as
// streamTransport.Connect returns, which would otherwise tear down
// the async SDP negotiation goroutine before it can find its peer.
streamCtx context.Context //nolint:containedctx // owned lifecycle ctx for the loopback PC
streamCancel context.CancelFunc
pcMu sync.Mutex
pc *webrtc.PeerConnection
isOfferer bool
negotiateOnce sync.Once
pendingICE []webrtc.ICECandidateInit
remoteDescSet bool
}
func (s *memoryStream) Connect(_ context.Context) error {
s.mu.Lock()
s.connected = true
pending := s.pending
s.pending = nil
onData := s.onData
s.mu.Unlock()
for _, payload := range pending {
if onData != nil {
onData(payload)
}
}
// If a video transport already touched us, kick off the offerer-side
// SDP negotiation asynchronously. Connect itself stays fast so that
// room.waitConnected (which only checks that the byte-channel side is
// up) doesn't deadlock waiting for a peer that hasn't joined yet.
s.pcMu.Lock()
needNegotiate := s.pc != nil && s.isOfferer
s.pcMu.Unlock()
if needNegotiate {
go s.negotiateOnce.Do(func() { s.runNegotiation(s.streamCtx) })
}
return nil
}
func (s *memoryStream) Send(data []byte) error {
s.mu.Lock()
if s.closed {
s.mu.Unlock()
return io.ErrClosedPipe
}
s.mu.Unlock()
payload := append([]byte(nil), data...)
s.room.mu.Lock()
peers := make([]*memoryStream, 0, len(s.room.streams))
for peer := range s.room.streams {
if peer != s {
peers = append(peers, peer)
}
}
s.room.mu.Unlock()
for _, peer := range peers {
peer.deliver(payload)
}
return nil
}
func (s *memoryStream) deliver(data []byte) {
s.mu.Lock()
if !s.connected && !s.closed {
s.pending = append(s.pending, append([]byte(nil), data...))
s.mu.Unlock()
return
}
ready := !s.closed && s.onData != nil
onData := s.onData
s.mu.Unlock()
if ready {
onData(append([]byte(nil), data...))
}
}
func (s *memoryStream) Close() error {
s.mu.Lock()
s.closed = true
s.connected = false
s.mu.Unlock()
if s.streamCancel != nil {
s.streamCancel()
}
s.pcMu.Lock()
pc := s.pc
s.pc = nil
s.pcMu.Unlock()
if pc != nil {
_ = pc.Close()
}
return nil
}
func (s *memoryStream) SetReconnectCallback(cb func(*webrtc.DataChannel)) {
s.mu.Lock()
if cb == nil {
s.reconnect = nil
} else {
s.reconnect = func() { cb(nil) }
}
s.mu.Unlock()
}
func (s *memoryStream) SetShouldReconnect(func() bool) {}
func (s *memoryStream) SetEndedCallback(cb func(string)) {
s.mu.Lock()
s.ended = cb
s.mu.Unlock()
}
func (s *memoryStream) WatchConnection(ctx context.Context) {
<-ctx.Done()
}
func (s *memoryStream) CanSend() bool {
return s.isConnected()
}
func (s *memoryStream) GetSendQueue() chan []byte { return nil }
func (s *memoryStream) GetBufferedAmount() uint64 { return 0 }
func (s *memoryStream) Reconnect(string) {}
func (s *memoryStream) Capabilities() engine.Capabilities {
return engine.Capabilities{ByteStream: true, VideoTrack: true}
}
func (s *memoryStream) AddVideoTrack(track webrtc.TrackLocal) error {
s.mu.Lock()
s.track = track
s.mu.Unlock()
pc, err := s.ensurePC()
if err != nil {
return err
}
if _, err := pc.AddTrack(track); err != nil {
return fmt.Errorf("memory loopback AddTrack: %w", err)
}
return nil
}
func (s *memoryStream) SetVideoTrackHandler(cb func(*webrtc.TrackRemote, *webrtc.RTPReceiver)) {
s.mu.Lock()
s.trackCB = cb
s.mu.Unlock()
// Ensuring the PC up-front lets pion start receiving via the
// transceiver that AddTrack on the peer side will create even if
// SetVideoTrackHandler arrives before AddVideoTrack.
if _, err := s.ensurePC(); err != nil {
// e2e helper: swallow - the failure surfaces on the next
// AddVideoTrack/Connect path that actually needs the PC.
_ = err
}
}
// ensurePC lazily creates the in-process *webrtc.PeerConnection used to
// loopback video tracks to the peer stream in the same memoryRoom.
// Safe to call from any code path; first caller wins.
func (s *memoryStream) ensurePC() (*webrtc.PeerConnection, error) {
s.pcMu.Lock()
if s.pc != nil {
pc := s.pc
s.pcMu.Unlock()
return pc, nil
}
s.pcMu.Unlock()
mediaEngine := &webrtc.MediaEngine{}
if err := mediaEngine.RegisterDefaultCodecs(); err != nil {
return nil, fmt.Errorf("memory loopback RegisterDefaultCodecs: %w", err)
}
// Empty interceptor registry: NACK/RR/SR add nothing useful on an
// in-process loopback and just cost CPU.
api := webrtc.NewAPI(
webrtc.WithMediaEngine(mediaEngine),
webrtc.WithInterceptorRegistry(&pioninterceptor.Registry{}),
)
pc, err := api.NewPeerConnection(webrtc.Configuration{})
if err != nil {
return nil, fmt.Errorf("memory loopback NewPeerConnection: %w", err)
}
s.pcMu.Lock()
if s.pc != nil {
// Lost the race: someone else built a PC concurrently.
existing := s.pc
s.pcMu.Unlock()
_ = pc.Close()
return existing, nil
}
s.pc = pc
s.isOfferer = s.room.isFirstStream(s)
s.pcMu.Unlock()
pc.OnICECandidate(func(c *webrtc.ICECandidate) {
if c == nil {
return
}
peer := s.room.peerOf(s)
if peer == nil {
return
}
peer.acceptICE(c.ToJSON())
})
pc.OnTrack(func(remote *webrtc.TrackRemote, recv *webrtc.RTPReceiver) {
s.mu.Lock()
cb := s.trackCB
s.mu.Unlock()
if cb != nil {
cb(remote, recv)
}
})
return pc, nil
}
// acceptICE adds a remote ICE candidate to our PC. If the remote SDP
// hasn't been applied yet the candidate is queued and flushed by
// applyRemoteDescription. This matches the standard trickle-ICE
// buffering pattern that pion would otherwise reject with
// ErrPeerConnectionRemoteDescriptionNil.
func (s *memoryStream) acceptICE(c webrtc.ICECandidateInit) {
s.pcMu.Lock()
if s.pc == nil {
s.pcMu.Unlock()
return
}
if !s.remoteDescSet {
s.pendingICE = append(s.pendingICE, c)
s.pcMu.Unlock()
return
}
pc := s.pc
s.pcMu.Unlock()
_ = pc.AddICECandidate(c)
}
func (s *memoryStream) applyRemoteDescription(sdp webrtc.SessionDescription) error {
s.pcMu.Lock()
pc := s.pc
s.pcMu.Unlock()
if pc == nil {
return fmt.Errorf("%w (remote description)", errMemoryLoopbackPCClosed)
}
if err := pc.SetRemoteDescription(sdp); err != nil {
return fmt.Errorf("memory loopback SetRemoteDescription: %w", err)
}
s.pcMu.Lock()
pending := s.pendingICE
s.pendingICE = nil
s.remoteDescSet = true
s.pcMu.Unlock()
for _, c := range pending {
_ = pc.AddICECandidate(c)
}
return nil
}
// acceptOffer is the answerer half of the SDP exchange. The offerer
// invokes this synchronously on the peer through the parent room.
func (s *memoryStream) acceptOffer(offer webrtc.SessionDescription) (webrtc.SessionDescription, error) {
if err := s.applyRemoteDescription(offer); err != nil {
return webrtc.SessionDescription{}, err
}
s.pcMu.Lock()
pc := s.pc
s.pcMu.Unlock()
if pc == nil {
return webrtc.SessionDescription{}, fmt.Errorf("%w (answer)", errMemoryLoopbackPCClosed)
}
answer, err := pc.CreateAnswer(nil)
if err != nil {
return webrtc.SessionDescription{}, fmt.Errorf("memory loopback CreateAnswer: %w", err)
}
if err := pc.SetLocalDescription(answer); err != nil {
return webrtc.SessionDescription{}, fmt.Errorf("memory loopback answer SetLocalDescription: %w", err)
}
return answer, nil
}
// runNegotiation is the offerer-side flow. It waits for the peer stream
// to be connected (so its PC and any local tracks are in place), then
// drives one SDP offer/answer cycle. ICE candidates flow through
// acceptICE in the background.
func (s *memoryStream) runNegotiation(ctx context.Context) {
peer, err := s.waitForPeer(ctx)
if err != nil {
return
}
s.pcMu.Lock()
pc := s.pc
s.pcMu.Unlock()
if pc == nil {
return
}
offer, err := pc.CreateOffer(nil)
if err != nil {
return
}
if err := pc.SetLocalDescription(offer); err != nil {
return
}
answer, err := peer.acceptOffer(offer)
if err != nil {
return
}
_ = s.applyRemoteDescription(answer)
}
// waitForPeer polls the room until a peer stream is both connected
// (its byte-channel Connect has returned) and has built its own PC.
// Polls every 10ms; cheap enough for an in-process loopback.
func (s *memoryStream) waitForPeer(ctx context.Context) (*memoryStream, error) {
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
peer := s.room.peerOf(s)
if peer != nil && peer.isConnected() {
peer.pcMu.Lock()
ready := peer.pc != nil
peer.pcMu.Unlock()
if ready {
return peer, nil
}
}
select {
case <-ctx.Done():
return nil, fmt.Errorf("memory loopback waitForPeer: %w", ctx.Err())
case <-ticker.C:
}
}
}
func (s *memoryStream) isConnected() bool {
s.mu.Lock()
defer s.mu.Unlock()
return s.connected && !s.closed
}
func (s *memoryStream) triggerReconnect() {
s.mu.Lock()
reconnect := s.reconnect
ready := s.connected && !s.closed && reconnect != nil
s.mu.Unlock()
if ready {
reconnect()
}
}
func (s *memoryStream) triggerEnded(reason string) {
s.mu.Lock()
ended := s.ended
ready := s.connected && !s.closed && ended != nil
s.mu.Unlock()
if ready {
ended(reason)
}
}
func registerMemoryCarrier(t *testing.T) (string, *memoryRoom) {
t.Helper()
session.RegisterDefaults()
name := "e2e-memory-" + t.Name()
room := &memoryRoom{streams: make(map[*memoryStream]struct{})}
enginebuiltin.Register(name, func(_ context.Context, cfg enginebuiltin.Config) (engine.Session, error) {
stream := newMemoryStream(room, cfg.OnData)
room.mu.Lock()
room.streams[stream] = struct{}{}
room.mu.Unlock()
return stream, nil
})
return name, room
}
// newMemoryStream builds a memoryStream with its own lifecycle context.
// The context lives until Close, so async PC negotiation goroutines see
// it stay alive past streamTransport.Connect's deferred cancel.
func newMemoryStream(room *memoryRoom, onData func([]byte)) *memoryStream {
ctx, cancel := context.WithCancel(context.Background())
return &memoryStream{
room: room,
onData: onData,
streamCtx: ctx,
streamCancel: cancel,
}
}
func registerMemoryCarrierAs(t *testing.T, name string) {
t.Helper()
room := &memoryRoom{streams: make(map[*memoryStream]struct{})}
enginebuiltin.Register(name, func(_ context.Context, cfg enginebuiltin.Config) (engine.Session, error) {
stream := newMemoryStream(room, cfg.OnData)
room.mu.Lock()
room.streams[stream] = struct{}{}
room.mu.Unlock()
return stream, nil
})
}
func registerFailingCarrier(t *testing.T) string {
t.Helper()
session.RegisterDefaults()
name := "e2e-fail-" + t.Name()
enginebuiltin.Register(name, func(context.Context, enginebuiltin.Config) (engine.Session, error) {
return nil, errFailoverCarrier
})
return name
}
func builtInCarrierNames() []string {
return []string{"telemost", "wbstream", "jitsi"} //nolint:goconst // test literal, repetition is intentional
}
func builtInTransportNames() []string {
return []string{transportData, transportVideo, transportSEI, transportVP8}
}
func realE2ECaseExpectation(carrierName, transportName string) realE2EExpectation {
switch carrierName {
case "telemost":
switch transportName {
case transportVP8:
return realE2EExpectPass
case transportVideo:
return realE2EExpectPass
default:
return realE2EExpectFail
}
case "wbstream":
if transportName == transportData {
return realE2EExpectFail
}
return realE2EExpectPass
case "jitsi":
// Jitsi colibri-ws bridge channel maps cleanly onto the
// datachannel transport (raw bytes broadcast through
// EndpointMessage). Video transports go through pion's
// PeerConnection negotiated via Jingle session-accept.
//
// Jitsi video-path transports are marked Unstable. They depend on
// the external JVB ICE/media path and can flap on self-hosted
// instances (e.g. meet1.arbitr.ru): ICE may stay in checking or
// the video upstream may be suppressed even though signaling and
// the colibri-ws bridge are healthy. Flag the outcome, but don't
// fail the suite when these paths flap.
switch transportName {
case transportVideo, transportSEI, transportVP8:
return realE2EExpectUnstable
}
return realE2EExpectPass
default:
return realE2EExpectPass
}
}
func realE2EExpectationLabel(expectation realE2EExpectation) string {
switch expectation {
case realE2EExpectPass:
return "SUCCESS"
case realE2EExpectFail:
return "EXPECTED FAIL"
case realE2EExpectUnstable:
return "UNSTABLE"
default:
return "UNKNOWN"
}
}
// logUnstableOutcome records the result of an Unstable matrix entry
// without failing the test. Unstable combos exist to keep the matrix
// honest about transports that flap against a particular carrier
// (e.g. seichannel against meet1.arbitr.ru's bandwidth allocator)
// while still surfacing whether the run happened to pass or fail.
func logUnstableOutcome(t *testing.T, label, carrierName, transportName string, err error) {
t.Helper()
if err == nil {
t.Logf("%s PASS %s/%s", label, carrierName, transportName)
return
}
t.Logf("%s FAIL %s/%s: %v", label, carrierName, transportName, err)
}
func TestRealE2ECaseExpectation(t *testing.T) {
tests := []struct {
name string
carrier string
transport string
want realE2EExpectation
}{
{
name: "telemost datachannel is expected to fail",
carrier: "telemost",
transport: transportData,
want: realE2EExpectFail,
},
{
name: "telemost vp8channel is expected to pass",
carrier: "telemost",
transport: transportVP8,
want: realE2EExpectPass,
},
{
name: "wbstream datachannel is expected to fail",
carrier: "wbstream",
transport: transportData,
want: realE2EExpectFail,
},
{
name: "jitsi datachannel is expected to pass",
carrier: "jitsi",
transport: transportData,
want: realE2EExpectPass,
},
{
name: "jitsi vp8channel is unstable",
carrier: "jitsi",
transport: transportVP8,
want: realE2EExpectUnstable,
},
{
name: "jitsi videochannel is unstable",
carrier: "jitsi",
transport: transportVideo,
want: realE2EExpectUnstable,
},
{
name: "jitsi seichannel is unstable",
carrier: "jitsi",
transport: transportSEI,
want: realE2EExpectUnstable,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := realE2ECaseExpectation(tt.carrier, tt.transport); got != tt.want {
t.Fatalf("realE2ECaseExpectation(%q, %q) = %v, want %v", tt.carrier, tt.transport, got, tt.want)
}
})
}
}
func splitTestList(value string) []string {
parts := strings.Split(value, ",")
items := make([]string, 0, len(parts))
for _, part := range parts {
part = strings.TrimSpace(part)
if part != "" {
items = append(items, part)
}
}
return items
}
//nolint:cyclop // table-driven test naturally has many branches
func realRoomURL(ctx context.Context, t *testing.T, carrierName string) string {
t.Helper()
switch carrierName {
case "telemost":
room := *realE2ETelemostRoom
if room != "" && !strings.HasPrefix(room, "http://") && !strings.HasPrefix(room, "https://") {
room = "https://telemost.yandex.ru/j/" + room
}
return room
case "wbstream":
if *realE2EWBStreamRoom != "" {
return *realE2EWBStreamRoom
}
_ = ctx
t.Skip("skip wbstream real e2e: set -olcrtc.real-wbstream-room to an existing room ID")
return ""
case "jitsi":
// Jitsi has no notion of "creating" a room - names are conjured
// on first join. The default flag points at meet1.arbitr.ru
// by default. When the flag is left at its default value, a
// per-process random suffix is appended
// to the slug: two participants share a single room by design (one
// pair, one shared key), so any third participant - including another
// concurrent test process with the same shared key - would corrupt
// the wire protocol on both sides. Users overriding the flag are
// trusted to manage room uniqueness themselves.
_ = ctx
room := *realE2EJitsiRoom
if room == "" {
t.Skip("skip jitsi real e2e: empty -olcrtc.real-jitsi-room")
}
if room == defaultJitsiRoomURL {
room = defaultJitsiRoomWithSuffix()
}
return room
default:
return ""
}
}
var (
jitsiRoomOnce sync.Once //nolint:gochecknoglobals // per-process suffix cache
jitsiRoomURL string //nolint:gochecknoglobals // per-process suffix cache
)
// defaultJitsiRoomWithSuffix returns the default Jitsi room URL with a random
// 8-hex-char suffix appended to the slug. Computed once per test process and
// cached so all sub-tests (server + client) land in the same MUC.
func defaultJitsiRoomWithSuffix() string {
jitsiRoomOnce.Do(func() {
var b [4]byte
if _, err := rand.Read(b[:]); err != nil {
// crypto/rand failing on a healthy host is exceptional; fall back
// to PID to keep tests usable rather than blowing up here.
jitsiRoomURL = fmt.Sprintf("%s-%d", defaultJitsiRoomURL, os.Getpid())
return
}
jitsiRoomURL = defaultJitsiRoomURL + "-" + hex.EncodeToString(b[:])
})
return jitsiRoomURL
}
func requireRealRoom(ctx context.Context, t *testing.T, carrierName string) string {
t.Helper()
roomURL := realRoomURL(ctx, t, carrierName)
if roomURL == "" {
t.Fatalf("missing room for %s", carrierName)
}
return roomURL
}
func validSessionConfig(mode, carrierName, transportName string) session.Config {
return session.Config{
Mode: mode,
Transport: transportName,
Auth: carrierName,
RoomID: testRoom,
KeyHex: testKeyHex,
SOCKSHost: "127.0.0.1",
SOCKSPort: 1080,
DNSServer: localDNSServer,
Video: session.VideoConfig{
Width: 1080, Height: 1080, FPS: 30, Bitrate: "1M",
HW: videoHWNone, Codec: "tile", TileModule: 4, TileRS: 20,
},
VP8: session.VP8Config{FPS: 60, BatchSize: 64},
SEI: session.SEIConfig{FPS: 30, BatchSize: 4, FragmentSize: 512, AckTimeoutMS: 1500},
}
}
// e2eTransportOptions builds the per-transport options bundle the e2e tests
// pass into server.Config / client.Config. Values mirror the documented
// validSessionConfig defaults so server and client end up agreeing on the
// transport tuning.
func e2eTransportOptions(transportName string) transport.Options {
switch transportName {
case "videochannel":
return videochannel.Options{
Width: 1080,
Height: 1080,
FPS: 60,
Bitrate: "5000k",
HW: videoHWNone,
QRSize: 512,
QRRecovery: "low",
Codec: "qrcode",
TileModule: 4,
TileRS: 20,
}
case "vp8channel":
return vp8channel.Options{FPS: 60, BatchSize: 64}
case "seichannel":
return seichannel.Options{FPS: 30, BatchSize: 4, FragmentSize: 512, AckTimeoutMS: 1500}
}
return nil
}
func validTransportConfig(carrierName, transportName string) transport.Config {
cfg := validSessionConfig("cnc", carrierName, transportName)
return transport.Config{
Carrier: cfg.Auth,
RoomURL: testRoom,
DeviceID: "e2e-link-test",
Name: "e2e-" + carrierName + "-" + transportName,
DNSServer: cfg.DNSServer,
Options: e2eTransportOptions(transportName),
}
}
func startEchoServer(t *testing.T) string {
t.Helper()
var lc net.ListenConfig
ln, err := lc.Listen(context.Background(), "tcp4", "127.0.0.1:0")
if err != nil {
t.Fatalf("listen echo: %v", err)
}
t.Cleanup(func() { _ = ln.Close() })
go func() {
for {
conn, err := ln.Accept()
if err != nil {
return
}
go func() {
defer func() { _ = conn.Close() }()
_, _ = io.Copy(conn, conn)
}()
}
}()
return ln.Addr().String()
}
func freeLocalAddr(ctx context.Context, t *testing.T) string {
t.Helper()
var lc net.ListenConfig
ln, err := lc.Listen(ctx, "tcp4", "127.0.0.1:0")
if err != nil {
t.Fatalf("reserve local addr: %v", err)
}
addr := ln.Addr().String()
if err := ln.Close(); err != nil {
t.Fatalf("close reserved addr: %v", err)
}
return addr
}
func waitForReady(t *testing.T, ready <-chan struct{}) {
t.Helper()
waitForReadyWithin(t, ready, 3*time.Second)
}
// waitForReadyWithin is waitForReady with a caller-chosen budget. Video
// transports (videochannel, seichannel, vp8channel) need a few seconds
// to finish their DTLS+ICE+SDP+smux handshake even on a loopback, so
// callers that exercise those paths pass a longer timeout here.
func waitForReadyWithin(t *testing.T, ready <-chan struct{}, budget time.Duration) {
t.Helper()
select {
case <-ready:
case <-time.After(budget):
t.Fatalf("client did not become ready within %s", budget)
}
}
type tunnelRuntime struct {
socksAddr string
room *memoryRoom
cancel context.CancelFunc
serverErr chan error
clientErr chan error
stopWait time.Duration
}
func startTunnel(t *testing.T) *tunnelRuntime {
t.Helper()
carrierName, room := registerMemoryCarrier(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
socksAddr := freeLocalAddr(ctx, t)
serverErr := make(chan error, 1)
go func() {
serverErr <- server.Run(ctx, server.Config{
Transport: transportData,
Carrier: carrierName,
RoomURL: testRoom,
KeyHex: testKeyHex,
DNSServer: localDNSServer,
})
}()
room.waitConnected(t, 1)
ready := make(chan struct{})
clientErr := make(chan error, 1)
go func() {
clientErr <- client.RunWithReady(ctx, client.Config{
Transport: transportData,
Carrier: carrierName,
RoomURL: testRoom,
KeyHex: testKeyHex,
DeviceID: testClientDeviceID,
LocalAddr: socksAddr,
DNSServer: localDNSServer,
}, func() { close(ready) })
}()
waitForReady(t, ready)
return &tunnelRuntime{
socksAddr: socksAddr,
room: room,
cancel: cancel,
serverErr: serverErr,
clientErr: clientErr,
stopWait: 3 * time.Second,
}
}
//nolint:cyclop // setup naturally branches on server/client/ready/timeout/context outcomes
func startRealTunnel(
ctx context.Context,
t *testing.T,
carrierName, transportName, roomURL, _, clientDeviceID string,
) (*tunnelRuntime, error) {
t.Helper()
session.RegisterDefaults()
socksAddr := freeLocalAddr(ctx, t)
channelID := fmt.Sprintf("e2e-%d-%d", os.Getpid(), time.Now().UnixNano())
runCtx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)
serverErr := make(chan error, 1)
go func() {
serverErr <- server.Run(runCtx, server.Config{
Transport: transportName,
Carrier: carrierName,
RoomURL: roomURL,
ChannelID: channelID,
KeyHex: testKeyHex,
DNSServer: localDNSServer,
TransportOptions: e2eTransportOptions(transportName),
})
}()
select {
case err := <-serverErr:
cancel()
if err == nil {
return nil, errServerExitedBeforeClientStart
}
return nil, fmt.Errorf("server exited before client start: %w", err)
case <-time.After(2 * time.Second):
case <-runCtx.Done():
cancel()
return nil, fmt.Errorf("server context ended before client start: %w", runCtx.Err())
}
ready := make(chan struct{})
clientErr := make(chan error, 1)
go func() {
clientErr <- client.RunWithReady(runCtx, client.Config{
Transport: transportName,
Carrier: carrierName,
RoomURL: roomURL,
ChannelID: channelID,
KeyHex: testKeyHex,
DeviceID: clientDeviceID,
LocalAddr: socksAddr,
DNSServer: localDNSServer,
TransportOptions: e2eTransportOptions(transportName),
}, func() { close(ready) })
}()
select {
case <-ready:
case err := <-clientErr:
cancel()
if err == nil {
return nil, errClientExitedBeforeReady
}
return nil, fmt.Errorf("client exited before ready: %w", err)
case err := <-serverErr:
cancel()
if err == nil {
return nil, errServerExitedBeforeClientReady
}
return nil, fmt.Errorf("server exited before client ready: %w", err)
case <-time.After(*realE2ETimeout):
cancel()
return nil, errRealE2ENotReady
case <-runCtx.Done():
cancel()
return nil, fmt.Errorf("real e2e context ended before ready: %w", runCtx.Err())
}
return &tunnelRuntime{
socksAddr: socksAddr,
cancel: cancel,
serverErr: serverErr,
clientErr: clientErr,
stopWait: 20 * time.Second,
}, nil
}
func (r *tunnelRuntime) stop(t *testing.T) {
t.Helper()
if err := r.stopErr(); err != nil {
t.Fatal(err)
}
}
func (r *tunnelRuntime) waitStopped(t *testing.T) {
t.Helper()
if err := r.waitStoppedErr(); err != nil {
t.Fatal(err)
}
}
func (r *tunnelRuntime) stopErr() error {
r.cancel()
return r.waitStoppedErr()
}
func (r *tunnelRuntime) waitStoppedErr() error {
stopWait := r.stopWait
if stopWait <= 0 {
stopWait = 3 * time.Second
}
for _, item := range []struct {
name string
ch <-chan error
}{
{name: "client", ch: r.clientErr},
{name: "server", ch: r.serverErr},
} {
select {
case err := <-item.ch:
if err != nil {
return fmt.Errorf("%s returned error: %w", item.name, err)
}
case <-time.After(stopWait):
return fmt.Errorf("%w: %s", errTunnelDidNotStop, item.name)
}
}
return nil
}
func connectViaSOCKS(t *testing.T, socksAddr, targetAddr string) net.Conn {
t.Helper()
dialer := net.Dialer{Timeout: 2 * time.Second}
conn, err := dialer.DialContext(context.Background(), "tcp4", socksAddr)
if err != nil {
t.Fatalf("dial socks: %v", err)
}
if _, err := conn.Write([]byte{5, 1, 0}); err != nil {
_ = conn.Close()
t.Fatalf("write socks greeting: %v", err)
}
greeting := make([]byte, 2)
if _, err := io.ReadFull(conn, greeting); err != nil {
_ = conn.Close()
t.Fatalf("read socks greeting: %v", err)
}
if !bytes.Equal(greeting, []byte{5, 0}) {
_ = conn.Close()
t.Fatalf("socks greeting = %v, want [5 0]", greeting)
}
host, portText, err := net.SplitHostPort(targetAddr)
if err != nil {
_ = conn.Close()
t.Fatalf("split target addr: %v", err)
}
port, err := strconv.Atoi(portText)
if err != nil {
_ = conn.Close()
t.Fatalf("parse target port: %v", err)
}
req := make([]byte, 0, 10)
req = append(req, 5, 1, 0, 1)
req = append(req, net.ParseIP(host).To4()...)
var portBuf [2]byte
binary.BigEndian.PutUint16(portBuf[:], uint16(port)) //nolint:gosec // SOCKS5 port is uint16 by definition
req = append(req, portBuf[:]...)
if _, err := conn.Write(req); err != nil {
_ = conn.Close()
t.Fatalf("write socks connect: %v", err)
}
reply := make([]byte, 10)
if _, err := io.ReadFull(conn, reply); err != nil {
_ = conn.Close()
t.Fatalf("read socks connect reply: %v", err)
}
if !bytes.Equal(reply, []byte{5, 0, 0, 1, 0, 0, 0, 0, 0, 0}) {
_ = conn.Close()
t.Fatalf("socks reply = %v, want success", reply)
}
return conn
}
func TestBuiltInProviderTransportMatrixValidates(t *testing.T) {
session.RegisterDefaults()
for _, mode := range []string{"srv", "cnc"} {
t.Run(mode, func(t *testing.T) {
for _, carrierName := range builtInCarrierNames() {
t.Run(carrierName, func(t *testing.T) {
for _, transportName := range builtInTransportNames() {
t.Run(transportName, func(t *testing.T) {
cfg := validSessionConfig(mode, carrierName, transportName)
if err := session.Validate(cfg); err != nil {
t.Fatalf("Validate() error = %v", err)
}
})
}
})
}
})
}
}
func TestTransportCreatesAllProviderTransportCombinations(t *testing.T) {
session.RegisterDefaults()
for _, carrierName := range builtInCarrierNames() {
registerMemoryCarrierAs(t, carrierName)
}
for _, carrierName := range builtInCarrierNames() {
t.Run(carrierName, func(t *testing.T) {
for _, transportName := range builtInTransportNames() {
t.Run(transportName, func(t *testing.T) {
tr, err := transport.New(context.Background(), transportName, validTransportConfig(carrierName, transportName))
if err != nil {
t.Fatalf("transport.New() error = %v", err)
}
if err := tr.Close(); err != nil {
t.Fatalf("Close() error = %v", err)
}
})
}
})
}
}
func TestTransportConnectsFastProviderTransportMatrix(t *testing.T) {
session.RegisterDefaults()
for _, carrierName := range builtInCarrierNames() {
registerMemoryCarrierAs(t, carrierName)
}
for _, carrierName := range builtInCarrierNames() {
t.Run(carrierName, func(t *testing.T) {
for _, transportName := range []string{transportData, transportSEI} {
t.Run(transportName, func(t *testing.T) {
tr, err := transport.New(context.Background(), transportName, validTransportConfig(carrierName, transportName))
if err != nil {
t.Fatalf("transport.New() error = %v", err)
}
if err := tr.Connect(context.Background()); err != nil {
t.Fatalf("Connect() error = %v", err)
}
assertTransportCanSendAfterConnect(t, tr, transportName)
if err := tr.Close(); err != nil {
t.Fatalf("Close() error = %v", err)
}
})
}
})
}
}
func assertTransportCanSendAfterConnect(t *testing.T, tr transport.Transport, transportName string) {
t.Helper()
if transportName == transportSEI {
if tr.CanSend() {
t.Fatal("CanSend() = true before peer seichannel frame")
}
return
}
if !tr.CanSend() {
t.Fatal("CanSend() = false, want true")
}
}
//nolint:cyclop // table-driven test naturally has many branches
func TestRealProviderTransportMatrix(t *testing.T) {
if !*realE2E {
t.Skip("real provider e2e disabled; pass -olcrtc.real-e2e with provider room flags")
}
carriers := splitTestList(*realE2ECarriers)
transports := splitTestList(*realE2ETransports)
if len(carriers) == 0 {
t.Fatal("no real e2e carriers selected")
}
if len(transports) == 0 {
t.Fatal("no real e2e transports selected")
}
echoAddr := startEchoServer(t)
for _, carrierName := range carriers {
t.Run(carrierName, func(t *testing.T) {
roomCtx, cancelRoom := context.WithTimeout(context.Background(), *realE2ETimeout)
defer cancelRoom()
roomURL := requireRealRoom(roomCtx, t, carrierName)
var authFailed bool
for _, transportName := range transports {
t.Run(transportName, func(t *testing.T) {
if authFailed {
t.Skip("skipping: carrier auth failed on previous transport")
}
expectation := realE2ECaseExpectation(carrierName, transportName)
label := realE2EExpectationLabel(expectation)
err := runRealE2ECase(t, carrierName, transportName, roomURL, echoAddr)
if err != nil && errors.Is(err, enginebuiltin.ErrAuthFailed) {
authFailed = true
t.Skipf("skip %s real e2e: auth failed: %v", carrierName, err)
}
switch {
case err == nil && expectation == realE2EExpectPass:
t.Logf("%s %s/%s", label, carrierName, transportName)
case err == nil && expectation == realE2EExpectFail:
t.Fatalf("UNEXPECTED SUCCESS %s/%s", carrierName, transportName)
case err != nil && expectation == realE2EExpectPass:
t.Fatalf("EXPECTED SUCCESS %s/%s failed: %v", carrierName, transportName, err)
case err != nil && expectation == realE2EExpectFail:
t.Logf("%s %s/%s: %v", label, carrierName, transportName, err)
case expectation == realE2EExpectUnstable:
logUnstableOutcome(t, label, carrierName, transportName, err)
}
})
}
})
}
}
func runRealE2ECase(t *testing.T, carrierName, transportName, roomURL, echoAddr string) (err error) {
t.Helper()
ctx, cancel := context.WithTimeout(context.Background(), *realE2ETimeout)
defer cancel()
rt, err := startRealTunnel(ctx, t, carrierName, transportName, roomURL, testClientDeviceID, testClientDeviceID)
if err != nil {
return err
}
defer func() {
if stopErr := rt.stopErr(); err == nil && stopErr != nil {
err = stopErr
}
}()
conn, err := connectViaSOCKSWithin(rt.socksAddr, echoAddr, *realE2ETimeout)
if err != nil {
return err
}
defer func() { _ = conn.Close() }()
payload := []byte("olcrtc-real-e2e-" + carrierName + "-" + transportName + "\n")
if _, err := conn.Write(payload); err != nil {
return fmt.Errorf("write real e2e payload: %w", err)
}
if err := conn.SetReadDeadline(time.Now().Add(*realE2ETimeout)); err != nil {
return fmt.Errorf("set real e2e read deadline: %w", err)
}
line, err := bufio.NewReader(conn).ReadBytes('\n')
if err != nil {
return fmt.Errorf("read real e2e echo: %w", err)
}
if !bytes.Equal(line, payload) {
return fmt.Errorf("%w: got %q, want %q", errRealE2EEchoMismatch, line, payload)
}
return nil
}
func TestClientServerSOCKSTunnelOverMemoryDatachannel(t *testing.T) {
echoAddr := startEchoServer(t)
rt := startTunnel(t)
defer rt.stop(t)
conn := connectViaSOCKS(t, rt.socksAddr, echoAddr)
defer func() { _ = conn.Close() }()
payload := []byte("olcrtc-e2e-payload\n")
if _, err := conn.Write(payload); err != nil {
t.Fatalf("write tunneled payload: %v", err)
}
if err := conn.SetReadDeadline(time.Now().Add(3 * time.Second)); err != nil {
t.Fatalf("set read deadline: %v", err)
}
line, err := bufio.NewReader(conn).ReadBytes('\n')
if err != nil {
t.Fatalf("read tunneled echo: %v", err)
}
if !bytes.Equal(line, payload) {
t.Fatalf("echo = %q, want %q", line, payload)
}
}
func TestFrequentReconnectsStillAllowNewSOCKSConnections(t *testing.T) {
echoAddr := startEchoServer(t)
rt := startTunnel(t)
defer rt.stop(t)
for i := range 5 {
rt.room.triggerReconnect()
conn := eventuallyConnectViaSOCKS(t, rt.socksAddr, echoAddr)
payload := fmt.Appendf(nil, "after-reconnect-%d\n", i)
if _, err := conn.Write(payload); err != nil {
_ = conn.Close()
t.Fatalf("write after reconnect %d: %v", i, err)
}
if err := conn.SetReadDeadline(time.Now().Add(3 * time.Second)); err != nil {
_ = conn.Close()
t.Fatalf("set deadline after reconnect %d: %v", i, err)
}
line, err := bufio.NewReader(conn).ReadBytes('\n')
_ = conn.Close()
if err != nil {
t.Fatalf("read after reconnect %d: %v", i, err)
}
if !bytes.Equal(line, payload) {
t.Fatalf("echo after reconnect %d = %q, want %q", i, line, payload)
}
}
}
func TestSupervisorFailoverProfilesReachWorkingSOCKS(t *testing.T) {
echoAddr := startEchoServer(t)
failingCarrier := registerFailingCarrier(t)
memoryCarrier, room := registerMemoryCarrier(t)
ctx, cancel := context.WithCancel(context.Background())
t.Cleanup(cancel)
socksAddr := freeLocalAddr(ctx, t)
socksHost, socksPort := splitHostPort(t, socksAddr)
serverProfiles := []supervisor.Profile{
{Name: "failing-server", Config: failoverSessionConfig("srv", failingCarrier, "", 0)},
{Name: "memory-server", Config: failoverSessionConfig("srv", memoryCarrier, "", 0)},
}
clientProfiles := []supervisor.Profile{
{Name: "failing-client", Config: failoverSessionConfig("cnc", failingCarrier, socksHost, socksPort)},
{Name: "memory-client", Config: failoverSessionConfig("cnc", memoryCarrier, socksHost, socksPort)},
}
started := make(chan string, 8)
serverErr := make(chan error, 1)
go func() {
serverErr <- supervisor.Run(ctx, failoverE2EConfig(serverProfiles, started, "server"), session.Run)
}()
room.waitConnected(t, 1)
ready := make(chan struct{})
var readyOnce sync.Once
clientErr := make(chan error, 1)
go func() {
runClientProfile := func(ctx context.Context, cfg session.Config) error {
return client.RunWithReady(ctx, clientConfigFromSession(cfg, socksAddr), func() {
if cfg.Auth == memoryCarrier {
readyOnce.Do(func() { close(ready) })
}
})
}
clientErr <- supervisor.Run(ctx, failoverE2EConfig(clientProfiles, started, "client"), runClientProfile)
}()
waitForReady(t, ready)
conn := eventuallyConnectViaSOCKS(t, socksAddr, echoAddr)
defer func() { _ = conn.Close() }()
payload := []byte("olcrtc-failover-e2e\n")
if _, err := conn.Write(payload); err != nil {
t.Fatalf("write failover payload: %v", err)
}
if err := conn.SetReadDeadline(time.Now().Add(3 * time.Second)); err != nil {
t.Fatalf("set failover read deadline: %v", err)
}
line, err := bufio.NewReader(conn).ReadBytes('\n')
if err != nil {
t.Fatalf("read failover echo: %v", err)
}
if !bytes.Equal(line, payload) {
t.Fatalf("failover echo = %q, want %q", line, payload)
}
requireStartedProfiles(t, started, []string{
"server:failing-server",
"server:memory-server",
"client:failing-client",
"client:memory-client",
})
cancel()
waitSupervisorStopped(t, "client", clientErr)
waitSupervisorStopped(t, "server", serverErr)
}
func failoverSessionConfig(mode, carrierName, socksHost string, socksPort int) session.Config {
cfg := session.Config{
Mode: mode,
Transport: transportData,
Auth: carrierName,
RoomID: testRoom,
KeyHex: testKeyHex,
DNSServer: localDNSServer,
}
if mode == "cnc" {
cfg.SOCKSHost = socksHost
cfg.SOCKSPort = socksPort
}
return cfg
}
func clientConfigFromSession(cfg session.Config, socksAddr string) client.Config {
return client.Config{
Transport: cfg.Transport,
Carrier: cfg.Auth,
RoomURL: cfg.RoomID,
KeyHex: cfg.KeyHex,
LocalAddr: socksAddr,
DNSServer: cfg.DNSServer,
DeviceID: testClientDeviceID,
TransportOptions: e2eTransportOptions(cfg.Transport),
Engine: cfg.Engine,
URL: cfg.URL,
Token: cfg.Token,
}
}
func failoverE2EConfig(
profiles []supervisor.Profile,
started chan<- string,
side string,
) supervisor.Config {
return supervisor.Config{
Profiles: profiles,
RetryDelay: time.Millisecond,
OnProfileStart: func(profile supervisor.Profile, _ int) {
select {
case started <- side + ":" + profile.Name:
default:
}
},
}
}
func splitHostPort(t *testing.T, addr string) (string, int) {
t.Helper()
host, portText, err := net.SplitHostPort(addr)
if err != nil {
t.Fatalf("split host port %q: %v", addr, err)
}
port, err := strconv.Atoi(portText)
if err != nil {
t.Fatalf("parse port %q: %v", portText, err)
}
return host, port
}
func requireStartedProfiles(t *testing.T, started <-chan string, want []string) {
t.Helper()
seen := make(map[string]bool)
deadline := time.After(3 * time.Second)
for len(seen) < len(want) {
select {
case item := <-started:
seen[item] = true
case <-deadline:
t.Fatalf("started profiles = %v, want all %v", seen, want)
}
}
for _, item := range want {
if !seen[item] {
t.Fatalf("started profiles = %v, missing %s", seen, item)
}
}
}
func waitSupervisorStopped(t *testing.T, name string, ch <-chan error) {
t.Helper()
select {
case err := <-ch:
if err != nil {
t.Fatalf("%s supervisor returned error: %v", name, err)
}
case <-time.After(3 * time.Second):
t.Fatalf("%s supervisor did not stop", name)
}
}
func TestEndedCallbackStopsClientAndServer(t *testing.T) {
rt := startTunnel(t)
rt.room.triggerEnded("conference ended")
rt.waitStopped(t)
}
func eventuallyConnectViaSOCKS(t *testing.T, socksAddr, targetAddr string) net.Conn {
t.Helper()
return eventuallyConnectViaSOCKSWithin(t, socksAddr, targetAddr, 3*time.Second)
}
func eventuallyConnectViaSOCKSWithin(t *testing.T, socksAddr, targetAddr string, timeout time.Duration) net.Conn {
t.Helper()
conn, err := connectViaSOCKSWithin(socksAddr, targetAddr, timeout)
if err != nil {
t.Fatal(err)
}
return conn
}
func connectViaSOCKSWithin(socksAddr, targetAddr string, timeout time.Duration) (net.Conn, error) {
deadline := time.Now().Add(timeout)
var lastErr error
attempt := 0
for time.Now().Before(deadline) {
conn, err := tryConnectViaSOCKS(socksAddr, targetAddr)
if err == nil {
return conn, nil
}
lastErr = err
attempt++
sleep := 250 * time.Millisecond
if attempt > 3 {
sleep = time.Second
}
time.Sleep(sleep)
}
return nil, fmt.Errorf("connect via SOCKS failed after %s: %w", timeout, lastErr)
}
func tryConnectViaSOCKS(socksAddr, targetAddr string) (net.Conn, error) {
dialer := net.Dialer{Timeout: 500 * time.Millisecond}
conn, err := dialer.DialContext(context.Background(), "tcp4", socksAddr)
if err != nil {
return nil, fmt.Errorf("dial socks: %w", err)
}
if _, err := conn.Write([]byte{5, 1, 0}); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("write greeting: %w", err)
}
greeting := make([]byte, 2)
if _, err := io.ReadFull(conn, greeting); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("read greeting: %w", err)
}
if !bytes.Equal(greeting, []byte{5, 0}) {
_ = conn.Close()
return nil, fmt.Errorf("%w: %v", errSocksUnexpectedHello, greeting)
}
host, portText, err := net.SplitHostPort(targetAddr)
if err != nil {
_ = conn.Close()
return nil, fmt.Errorf("split host port: %w", err)
}
port, err := strconv.Atoi(portText)
if err != nil {
_ = conn.Close()
return nil, fmt.Errorf("parse port: %w", err)
}
req := make([]byte, 0, 10)
req = append(req, 5, 1, 0, 1)
req = append(req, net.ParseIP(host).To4()...)
var portBuf [2]byte
binary.BigEndian.PutUint16(portBuf[:], uint16(port)) //nolint:gosec // SOCKS5 port is uint16 by definition
req = append(req, portBuf[:]...)
if _, err := conn.Write(req); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("write connect request: %w", err)
}
reply := make([]byte, 10)
if _, err := io.ReadFull(conn, reply); err != nil {
_ = conn.Close()
return nil, fmt.Errorf("read connect reply: %w", err)
}
if !bytes.Equal(reply, []byte{5, 0, 0, 1, 0, 0, 0, 0, 0, 0}) {
_ = conn.Close()
return nil, fmt.Errorf("%w: %v", errSocksUnexpectedReply, reply)
}
return conn, nil
}
func TestLargeTransferOverTunnel(t *testing.T) {
echoAddr := startEchoServer(t)
rt := startTunnel(t)
defer rt.stop(t)
size := int64(32 << 20)
conn := connectViaSOCKS(t, rt.socksAddr, echoAddr)
defer func() { _ = conn.Close() }()
if err := streamPatternAndVerifyEcho(conn, size); err != nil {
t.Fatalf("large transfer %d bytes failed: %v", size, err)
}
}
func streamPatternAndVerifyEcho(conn net.Conn, size int64) error {
errCh := make(chan error, 1)
go func() {
buf := make([]byte, 32*1024)
var written int64
for written < size {
n := len(buf)
if remaining := size - written; remaining < int64(n) {
n = int(remaining)
}
fillPattern(buf[:n], written)
if _, err := conn.Write(buf[:n]); err != nil {
errCh <- fmt.Errorf("write at %d: %w", written, err)
return
}
written += int64(n)
}
errCh <- nil
}()
buf := make([]byte, 32*1024)
want := make([]byte, len(buf))
var read int64
for read < size {
n := len(buf)
if remaining := size - read; remaining < int64(n) {
n = int(remaining)
}
if err := conn.SetReadDeadline(time.Now().Add(10 * time.Second)); err != nil {
return fmt.Errorf("set read deadline: %w", err)
}
if _, err := io.ReadFull(conn, buf[:n]); err != nil {
return fmt.Errorf("read at %d: %w", read, err)
}
fillPattern(want[:n], read)
if !bytes.Equal(buf[:n], want[:n]) {
return fmt.Errorf("%w %d", errPayloadMismatchOffset, read)
}
read += int64(n)
}
if err := <-errCh; err != nil {
return err
}
return nil
}
func fillPattern(buf []byte, offset int64) {
for i := range buf {
buf[i] = byte((offset + int64(i)*31 + 7) & 0xff)
}
}