test(e2e): add in-memory WebRTC loopback for video transport testing

This commit is contained in:
zarazaex69
2026-05-24 04:45:06 +03:00
parent f63aa0bc43
commit 4d22169be6
3 changed files with 467 additions and 13 deletions

View File

@@ -236,7 +236,10 @@ func startLocalSoakTunnel(t *testing.T, transportName string) *tunnelRuntime {
DNSServer: localDNSServer,
}, func() { close(ready) })
}()
waitForReady(t, ready)
// Video transports go through ICE+DTLS+SRTP+smux before becoming
// ready, so reuse the longer setup budget the soak test allows for
// the SOCKS connect.
waitForReadyWithin(t, ready, 30*time.Second)
return &tunnelRuntime{
socksAddr: socksAddr,

View File

@@ -0,0 +1,157 @@
package e2e
import (
"context"
"errors"
"testing"
"time"
"github.com/pion/webrtc/v4"
"github.com/pion/webrtc/v4/pkg/media"
)
// errPionLoopbackNoRTP signals that the receiving PeerConnection never
// surfaced a single RTP packet during the proof-of-concept window.
var errPionLoopbackNoRTP = errors.New("pion loopback: no RTP delivered")
// vp8KeyframeStub is the smallest byte sequence pion accepts as a VP8
// keyframe sample. It is meaningless visually but valid enough that the
// depacketizer/sample builder on the receiving side accepts the RTP
// stream and OnTrack fires. We are not testing video quality here — only
// that two PeerConnections inside the same process can negotiate, gather
// loopback ICE candidates, and exchange RTP without an external bridge.
//
//nolint:gochecknoglobals // test-only constant byte literal
var vp8KeyframeStub = []byte{
0x10, 0x00, 0x00, 0x9d, 0x01, 0x2a, 0x40, 0x01, 0xf0, 0x00,
}
// TestPionLoopbackPOC is a focused proof-of-concept: spin up two pion
// PeerConnections inside one process, wire them together with manual
// SDP+trickle-ICE exchange, publish a VP8 track on one side, and wait
// for OnTrack to surface real RTP on the other side. If this test
// passes, the same wiring can be embedded into memoryStream so that
// videochannel/seichannel/vp8channel transports complete their
// handshake under the in-memory carrier used by the e2e suite.
//
// The test is intentionally small and self-contained — no production
// code is touched, no e2e fixtures are involved.
//
//nolint:gocognit,cyclop // PoC is intentionally linear; splitting helpers hurts readability
func TestPionLoopbackPOC(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
api := webrtc.NewAPI()
pcOffer, err := api.NewPeerConnection(webrtc.Configuration{})
if err != nil {
t.Fatalf("new offer pc: %v", err)
}
t.Cleanup(func() { _ = pcOffer.Close() })
pcAnswer, err := api.NewPeerConnection(webrtc.Configuration{})
if err != nil {
t.Fatalf("new answer pc: %v", err)
}
t.Cleanup(func() { _ = pcAnswer.Close() })
// Trickle ICE: forward every gathered candidate straight to the peer.
// A nil candidate terminates gathering and is intentionally ignored.
pcOffer.OnICECandidate(func(c *webrtc.ICECandidate) {
if c == nil {
return
}
if err := pcAnswer.AddICECandidate(c.ToJSON()); err != nil {
t.Logf("answer AddICECandidate: %v", err)
}
})
pcAnswer.OnICECandidate(func(c *webrtc.ICECandidate) {
if c == nil {
return
}
if err := pcOffer.AddICECandidate(c.ToJSON()); err != nil {
t.Logf("offer AddICECandidate: %v", err)
}
})
gotRTP := make(chan struct{})
pcAnswer.OnTrack(func(track *webrtc.TrackRemote, _ *webrtc.RTPReceiver) {
buf := make([]byte, 1500)
// First read confirms RTP is actually flowing end-to-end.
if _, _, err := track.Read(buf); err != nil {
t.Logf("answer first Read: %v", err)
return
}
select {
case <-gotRTP:
default:
close(gotRTP)
}
// Keep draining so pion's per-track buffer doesn't back up.
for {
if _, _, err := track.Read(buf); err != nil {
return
}
}
})
localTrack, err := webrtc.NewTrackLocalStaticSample(
webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8, ClockRate: 90000},
"poc-stream",
"poc-track",
)
if err != nil {
t.Fatalf("new local track: %v", err)
}
if _, err := pcOffer.AddTrack(localTrack); err != nil {
t.Fatalf("offer AddTrack: %v", err)
}
offer, err := pcOffer.CreateOffer(nil)
if err != nil {
t.Fatalf("create offer: %v", err)
}
if err := pcOffer.SetLocalDescription(offer); err != nil {
t.Fatalf("offer SetLocalDescription: %v", err)
}
if err := pcAnswer.SetRemoteDescription(offer); err != nil {
t.Fatalf("answer SetRemoteDescription: %v", err)
}
answer, err := pcAnswer.CreateAnswer(nil)
if err != nil {
t.Fatalf("create answer: %v", err)
}
if err := pcAnswer.SetLocalDescription(answer); err != nil {
t.Fatalf("answer SetLocalDescription: %v", err)
}
if err := pcOffer.SetRemoteDescription(answer); err != nil {
t.Fatalf("offer SetRemoteDescription: %v", err)
}
// Pump tiny VP8 keyframe stubs at 20 fps until OnTrack fires or
// the test times out. Real frame content is irrelevant — we only
// care that RTP packets traverse the loopback ICE/DTLS/SRTP stack.
go func() {
ticker := time.NewTicker(50 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
_ = localTrack.WriteSample(media.Sample{
Data: vp8KeyframeStub,
Duration: 50 * time.Millisecond,
})
}
}
}()
select {
case <-gotRTP:
t.Logf("pion loopback delivered RTP successfully")
case <-ctx.Done():
t.Fatalf("%v: timed out after %s", errPionLoopbackNoRTP, 15*time.Second)
}
}

View File

@@ -29,6 +29,7 @@ import (
"github.com/openlibrecommunity/olcrtc/internal/transport/seichannel"
"github.com/openlibrecommunity/olcrtc/internal/transport/videochannel"
"github.com/openlibrecommunity/olcrtc/internal/transport/vp8channel"
pioninterceptor "github.com/pion/interceptor"
"github.com/pion/webrtc/v4"
)
@@ -46,13 +47,14 @@ const (
)
var (
errRealE2ENotReady = errors.New("real e2e client did not become ready")
errTunnelDidNotStop = errors.New("tunnel goroutine did not stop")
errRealE2EEchoMismatch = errors.New("real e2e echo payload mismatch")
errSocksUnexpectedReply = errors.New("unexpected SOCKS5 reply")
errSocksUnexpectedHello = errors.New("unexpected SOCKS5 greeting")
errPayloadMismatchOffset = errors.New("payload mismatch at offset")
errFailoverCarrier = errors.New("intentional failover carrier failure")
errRealE2ENotReady = errors.New("real e2e client did not become ready")
errTunnelDidNotStop = errors.New("tunnel goroutine did not stop")
errRealE2EEchoMismatch = errors.New("real e2e echo payload mismatch")
errSocksUnexpectedReply = errors.New("unexpected SOCKS5 reply")
errSocksUnexpectedHello = errors.New("unexpected SOCKS5 greeting")
errPayloadMismatchOffset = errors.New("payload mismatch at offset")
errFailoverCarrier = errors.New("intentional failover carrier failure")
errMemoryLoopbackPCClosed = errors.New("memory loopback: PC closed during SDP exchange")
errServerExitedBeforeClientStart = errors.New("server exited cleanly before client start")
errClientExitedBeforeReady = errors.New("client exited cleanly before ready")
@@ -181,6 +183,35 @@ func (r *memoryRoom) triggerEnded(reason string) {
}
}
// peerOf returns the other stream in a 2-party room or nil if there is
// no peer (yet). Video loopback relies on a single 1:1 partner so we
// just pick the first non-self stream we see.
func (r *memoryRoom) peerOf(s *memoryStream) *memoryStream {
r.mu.Lock()
defer r.mu.Unlock()
for stream := range r.streams {
if stream != s {
return stream
}
}
return nil
}
// isFirstStream reports whether s is the only stream currently in the
// room. Used to deterministically pick the SDP offerer: the first
// stream to register builds its PC first and becomes the offerer.
func (r *memoryRoom) isFirstStream(s *memoryStream) bool {
r.mu.Lock()
defer r.mu.Unlock()
if len(r.streams) == 0 {
return true
}
if _, ok := r.streams[s]; ok && len(r.streams) == 1 {
return true
}
return false
}
type memoryStream struct {
room *memoryRoom
onData func([]byte)
@@ -193,9 +224,31 @@ type memoryStream struct {
track webrtc.TrackLocal
trackCB func(*webrtc.TrackRemote, *webrtc.RTPReceiver)
pending [][]byte
// Video-track plumbing. Until AddVideoTrack or
// SetVideoTrackHandler is called the embedded *webrtc.PeerConnection
// stays nil and memoryStream behaves as the original byte-only
// carrier. Once a video transport touches us we lazily build a real
// pion PC; SDP/ICE exchange with the peer stream is plumbed through
// the parent room, so two streams in the same room loopback their
// video tracks through a real (in-process) WebRTC stack.
//
// streamCtx is owned by the stream itself (cancelled in Close), not
// by the short-lived ctx that Connect receives — the video
// transport's connectCtx fires its deferred cancel as soon as
// streamTransport.Connect returns, which would otherwise tear down
// the async SDP negotiation goroutine before it can find its peer.
streamCtx context.Context //nolint:containedctx // owned lifecycle ctx for the loopback PC
streamCancel context.CancelFunc
pcMu sync.Mutex
pc *webrtc.PeerConnection
isOfferer bool
negotiateOnce sync.Once
pendingICE []webrtc.ICECandidateInit
remoteDescSet bool
}
func (s *memoryStream) Connect(context.Context) error {
func (s *memoryStream) Connect(_ context.Context) error {
s.mu.Lock()
s.connected = true
pending := s.pending
@@ -207,6 +260,17 @@ func (s *memoryStream) Connect(context.Context) error {
onData(payload)
}
}
// If a video transport already touched us, kick off the offerer-side
// SDP negotiation asynchronously. Connect itself stays fast so that
// room.waitConnected (which only checks that the byte-channel side is
// up) doesn't deadlock waiting for a peer that hasn't joined yet.
s.pcMu.Lock()
needNegotiate := s.pc != nil && s.isOfferer
s.pcMu.Unlock()
if needNegotiate {
go s.negotiateOnce.Do(func() { s.runNegotiation(s.streamCtx) })
}
return nil
}
@@ -254,6 +318,18 @@ func (s *memoryStream) Close() error {
s.closed = true
s.connected = false
s.mu.Unlock()
if s.streamCancel != nil {
s.streamCancel()
}
s.pcMu.Lock()
pc := s.pc
s.pc = nil
s.pcMu.Unlock()
if pc != nil {
_ = pc.Close()
}
return nil
}
@@ -289,6 +365,14 @@ func (s *memoryStream) AddVideoTrack(track webrtc.TrackLocal) error {
s.mu.Lock()
s.track = track
s.mu.Unlock()
pc, err := s.ensurePC()
if err != nil {
return err
}
if _, err := pc.AddTrack(track); err != nil {
return fmt.Errorf("memory loopback AddTrack: %w", err)
}
return nil
}
@@ -296,6 +380,194 @@ func (s *memoryStream) SetVideoTrackHandler(cb func(*webrtc.TrackRemote, *webrtc
s.mu.Lock()
s.trackCB = cb
s.mu.Unlock()
// Ensuring the PC up-front lets pion start receiving via the
// transceiver that AddTrack on the peer side will create even if
// SetVideoTrackHandler arrives before AddVideoTrack.
if _, err := s.ensurePC(); err != nil {
// e2e helper: swallow — the failure surfaces on the next
// AddVideoTrack/Connect path that actually needs the PC.
_ = err
}
}
// ensurePC lazily creates the in-process *webrtc.PeerConnection used to
// loopback video tracks to the peer stream in the same memoryRoom.
// Safe to call from any code path; first caller wins.
func (s *memoryStream) ensurePC() (*webrtc.PeerConnection, error) {
s.pcMu.Lock()
if s.pc != nil {
pc := s.pc
s.pcMu.Unlock()
return pc, nil
}
s.pcMu.Unlock()
mediaEngine := &webrtc.MediaEngine{}
if err := mediaEngine.RegisterDefaultCodecs(); err != nil {
return nil, fmt.Errorf("memory loopback RegisterDefaultCodecs: %w", err)
}
// Empty interceptor registry: NACK/RR/SR add nothing useful on an
// in-process loopback and just cost CPU.
api := webrtc.NewAPI(
webrtc.WithMediaEngine(mediaEngine),
webrtc.WithInterceptorRegistry(&pioninterceptor.Registry{}),
)
pc, err := api.NewPeerConnection(webrtc.Configuration{})
if err != nil {
return nil, fmt.Errorf("memory loopback NewPeerConnection: %w", err)
}
s.pcMu.Lock()
if s.pc != nil {
// Lost the race: someone else built a PC concurrently.
existing := s.pc
s.pcMu.Unlock()
_ = pc.Close()
return existing, nil
}
s.pc = pc
s.isOfferer = s.room.isFirstStream(s)
s.pcMu.Unlock()
pc.OnICECandidate(func(c *webrtc.ICECandidate) {
if c == nil {
return
}
peer := s.room.peerOf(s)
if peer == nil {
return
}
peer.acceptICE(c.ToJSON())
})
pc.OnTrack(func(remote *webrtc.TrackRemote, recv *webrtc.RTPReceiver) {
s.mu.Lock()
cb := s.trackCB
s.mu.Unlock()
if cb != nil {
cb(remote, recv)
}
})
return pc, nil
}
// acceptICE adds a remote ICE candidate to our PC. If the remote SDP
// hasn't been applied yet the candidate is queued and flushed by
// applyRemoteDescription. This matches the standard trickle-ICE
// buffering pattern that pion would otherwise reject with
// ErrPeerConnectionRemoteDescriptionNil.
func (s *memoryStream) acceptICE(c webrtc.ICECandidateInit) {
s.pcMu.Lock()
if s.pc == nil {
s.pcMu.Unlock()
return
}
if !s.remoteDescSet {
s.pendingICE = append(s.pendingICE, c)
s.pcMu.Unlock()
return
}
pc := s.pc
s.pcMu.Unlock()
_ = pc.AddICECandidate(c)
}
func (s *memoryStream) applyRemoteDescription(sdp webrtc.SessionDescription) error {
s.pcMu.Lock()
pc := s.pc
s.pcMu.Unlock()
if pc == nil {
return fmt.Errorf("%w (remote description)", errMemoryLoopbackPCClosed)
}
if err := pc.SetRemoteDescription(sdp); err != nil {
return fmt.Errorf("memory loopback SetRemoteDescription: %w", err)
}
s.pcMu.Lock()
pending := s.pendingICE
s.pendingICE = nil
s.remoteDescSet = true
s.pcMu.Unlock()
for _, c := range pending {
_ = pc.AddICECandidate(c)
}
return nil
}
// acceptOffer is the answerer half of the SDP exchange. The offerer
// invokes this synchronously on the peer through the parent room.
func (s *memoryStream) acceptOffer(offer webrtc.SessionDescription) (webrtc.SessionDescription, error) {
if err := s.applyRemoteDescription(offer); err != nil {
return webrtc.SessionDescription{}, err
}
s.pcMu.Lock()
pc := s.pc
s.pcMu.Unlock()
if pc == nil {
return webrtc.SessionDescription{}, fmt.Errorf("%w (answer)", errMemoryLoopbackPCClosed)
}
answer, err := pc.CreateAnswer(nil)
if err != nil {
return webrtc.SessionDescription{}, fmt.Errorf("memory loopback CreateAnswer: %w", err)
}
if err := pc.SetLocalDescription(answer); err != nil {
return webrtc.SessionDescription{}, fmt.Errorf("memory loopback answer SetLocalDescription: %w", err)
}
return answer, nil
}
// runNegotiation is the offerer-side flow. It waits for the peer stream
// to be connected (so its PC and any local tracks are in place), then
// drives one SDP offer/answer cycle. ICE candidates flow through
// acceptICE in the background.
func (s *memoryStream) runNegotiation(ctx context.Context) {
peer, err := s.waitForPeer(ctx)
if err != nil {
return
}
s.pcMu.Lock()
pc := s.pc
s.pcMu.Unlock()
if pc == nil {
return
}
offer, err := pc.CreateOffer(nil)
if err != nil {
return
}
if err := pc.SetLocalDescription(offer); err != nil {
return
}
answer, err := peer.acceptOffer(offer)
if err != nil {
return
}
_ = s.applyRemoteDescription(answer)
}
// waitForPeer polls the room until a peer stream is both connected
// (its byte-channel Connect has returned) and has built its own PC.
// Polls every 10ms; cheap enough for an in-process loopback.
func (s *memoryStream) waitForPeer(ctx context.Context) (*memoryStream, error) {
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
peer := s.room.peerOf(s)
if peer != nil && peer.isConnected() {
peer.pcMu.Lock()
ready := peer.pc != nil
peer.pcMu.Unlock()
if ready {
return peer, nil
}
}
select {
case <-ctx.Done():
return nil, fmt.Errorf("memory loopback waitForPeer: %w", ctx.Err())
case <-ticker.C:
}
}
}
func (s *memoryStream) isConnected() bool {
@@ -331,7 +603,7 @@ func registerMemoryCarrier(t *testing.T) (string, *memoryRoom) {
name := "e2e-memory-" + t.Name()
room := &memoryRoom{streams: make(map[*memoryStream]struct{})}
enginebuiltin.Register(name, func(_ context.Context, cfg enginebuiltin.Config) (engine.Session, error) {
stream := &memoryStream{room: room, onData: cfg.OnData}
stream := newMemoryStream(room, cfg.OnData)
room.mu.Lock()
room.streams[stream] = struct{}{}
room.mu.Unlock()
@@ -340,12 +612,25 @@ func registerMemoryCarrier(t *testing.T) (string, *memoryRoom) {
return name, room
}
// newMemoryStream builds a memoryStream with its own lifecycle context.
// The context lives until Close, so async PC negotiation goroutines see
// it stay alive past streamTransport.Connect's deferred cancel.
func newMemoryStream(room *memoryRoom, onData func([]byte)) *memoryStream {
ctx, cancel := context.WithCancel(context.Background())
return &memoryStream{
room: room,
onData: onData,
streamCtx: ctx,
streamCancel: cancel,
}
}
func registerMemoryCarrierAs(t *testing.T, name string) {
t.Helper()
room := &memoryRoom{streams: make(map[*memoryStream]struct{})}
enginebuiltin.Register(name, func(_ context.Context, cfg enginebuiltin.Config) (engine.Session, error) {
stream := &memoryStream{room: room, onData: cfg.OnData}
stream := newMemoryStream(room, cfg.OnData)
room.mu.Lock()
room.streams[stream] = struct{}{}
room.mu.Unlock()
@@ -682,11 +967,20 @@ func freeLocalAddr(ctx context.Context, t *testing.T) string {
}
func waitForReady(t *testing.T, ready <-chan struct{}) {
t.Helper()
waitForReadyWithin(t, ready, 3*time.Second)
}
// waitForReadyWithin is waitForReady with a caller-chosen budget. Video
// transports (videochannel, seichannel, vp8channel) need a few seconds
// to finish their DTLS+ICE+SDP+smux handshake even on a loopback, so
// callers that exercise those paths pass a longer timeout here.
func waitForReadyWithin(t *testing.T, ready <-chan struct{}, budget time.Duration) {
t.Helper()
select {
case <-ready:
case <-time.After(3 * time.Second):
t.Fatal("client did not become ready")
case <-time.After(budget):
t.Fatalf("client did not become ready within %s", budget)
}
}