From 4d22169be629e6edca2ee1562fe8f3eacf946774 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Sun, 24 May 2026 04:45:06 +0300 Subject: [PATCH] test(e2e): add in-memory WebRTC loopback for video transport testing --- internal/e2e/local_soak_test.go | 5 +- internal/e2e/pion_loopback_poc_test.go | 157 ++++++++++++ internal/e2e/tunnel_test.go | 318 ++++++++++++++++++++++++- 3 files changed, 467 insertions(+), 13 deletions(-) create mode 100644 internal/e2e/pion_loopback_poc_test.go diff --git a/internal/e2e/local_soak_test.go b/internal/e2e/local_soak_test.go index 8be48b4..a33dc5a 100644 --- a/internal/e2e/local_soak_test.go +++ b/internal/e2e/local_soak_test.go @@ -236,7 +236,10 @@ func startLocalSoakTunnel(t *testing.T, transportName string) *tunnelRuntime { DNSServer: localDNSServer, }, func() { close(ready) }) }() - waitForReady(t, ready) + // Video transports go through ICE+DTLS+SRTP+smux before becoming + // ready, so reuse the longer setup budget the soak test allows for + // the SOCKS connect. + waitForReadyWithin(t, ready, 30*time.Second) return &tunnelRuntime{ socksAddr: socksAddr, diff --git a/internal/e2e/pion_loopback_poc_test.go b/internal/e2e/pion_loopback_poc_test.go new file mode 100644 index 0000000..a08807a --- /dev/null +++ b/internal/e2e/pion_loopback_poc_test.go @@ -0,0 +1,157 @@ +package e2e + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/pion/webrtc/v4" + "github.com/pion/webrtc/v4/pkg/media" +) + +// errPionLoopbackNoRTP signals that the receiving PeerConnection never +// surfaced a single RTP packet during the proof-of-concept window. +var errPionLoopbackNoRTP = errors.New("pion loopback: no RTP delivered") + +// vp8KeyframeStub is the smallest byte sequence pion accepts as a VP8 +// keyframe sample. It is meaningless visually but valid enough that the +// depacketizer/sample builder on the receiving side accepts the RTP +// stream and OnTrack fires. We are not testing video quality here — only +// that two PeerConnections inside the same process can negotiate, gather +// loopback ICE candidates, and exchange RTP without an external bridge. +// +//nolint:gochecknoglobals // test-only constant byte literal +var vp8KeyframeStub = []byte{ + 0x10, 0x00, 0x00, 0x9d, 0x01, 0x2a, 0x40, 0x01, 0xf0, 0x00, +} + +// TestPionLoopbackPOC is a focused proof-of-concept: spin up two pion +// PeerConnections inside one process, wire them together with manual +// SDP+trickle-ICE exchange, publish a VP8 track on one side, and wait +// for OnTrack to surface real RTP on the other side. If this test +// passes, the same wiring can be embedded into memoryStream so that +// videochannel/seichannel/vp8channel transports complete their +// handshake under the in-memory carrier used by the e2e suite. +// +// The test is intentionally small and self-contained — no production +// code is touched, no e2e fixtures are involved. +// +//nolint:gocognit,cyclop // PoC is intentionally linear; splitting helpers hurts readability +func TestPionLoopbackPOC(t *testing.T) { + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + api := webrtc.NewAPI() + + pcOffer, err := api.NewPeerConnection(webrtc.Configuration{}) + if err != nil { + t.Fatalf("new offer pc: %v", err) + } + t.Cleanup(func() { _ = pcOffer.Close() }) + + pcAnswer, err := api.NewPeerConnection(webrtc.Configuration{}) + if err != nil { + t.Fatalf("new answer pc: %v", err) + } + t.Cleanup(func() { _ = pcAnswer.Close() }) + + // Trickle ICE: forward every gathered candidate straight to the peer. + // A nil candidate terminates gathering and is intentionally ignored. + pcOffer.OnICECandidate(func(c *webrtc.ICECandidate) { + if c == nil { + return + } + if err := pcAnswer.AddICECandidate(c.ToJSON()); err != nil { + t.Logf("answer AddICECandidate: %v", err) + } + }) + pcAnswer.OnICECandidate(func(c *webrtc.ICECandidate) { + if c == nil { + return + } + if err := pcOffer.AddICECandidate(c.ToJSON()); err != nil { + t.Logf("offer AddICECandidate: %v", err) + } + }) + + gotRTP := make(chan struct{}) + pcAnswer.OnTrack(func(track *webrtc.TrackRemote, _ *webrtc.RTPReceiver) { + buf := make([]byte, 1500) + // First read confirms RTP is actually flowing end-to-end. + if _, _, err := track.Read(buf); err != nil { + t.Logf("answer first Read: %v", err) + return + } + select { + case <-gotRTP: + default: + close(gotRTP) + } + // Keep draining so pion's per-track buffer doesn't back up. + for { + if _, _, err := track.Read(buf); err != nil { + return + } + } + }) + + localTrack, err := webrtc.NewTrackLocalStaticSample( + webrtc.RTPCodecCapability{MimeType: webrtc.MimeTypeVP8, ClockRate: 90000}, + "poc-stream", + "poc-track", + ) + if err != nil { + t.Fatalf("new local track: %v", err) + } + if _, err := pcOffer.AddTrack(localTrack); err != nil { + t.Fatalf("offer AddTrack: %v", err) + } + + offer, err := pcOffer.CreateOffer(nil) + if err != nil { + t.Fatalf("create offer: %v", err) + } + if err := pcOffer.SetLocalDescription(offer); err != nil { + t.Fatalf("offer SetLocalDescription: %v", err) + } + if err := pcAnswer.SetRemoteDescription(offer); err != nil { + t.Fatalf("answer SetRemoteDescription: %v", err) + } + answer, err := pcAnswer.CreateAnswer(nil) + if err != nil { + t.Fatalf("create answer: %v", err) + } + if err := pcAnswer.SetLocalDescription(answer); err != nil { + t.Fatalf("answer SetLocalDescription: %v", err) + } + if err := pcOffer.SetRemoteDescription(answer); err != nil { + t.Fatalf("offer SetRemoteDescription: %v", err) + } + + // Pump tiny VP8 keyframe stubs at 20 fps until OnTrack fires or + // the test times out. Real frame content is irrelevant — we only + // care that RTP packets traverse the loopback ICE/DTLS/SRTP stack. + go func() { + ticker := time.NewTicker(50 * time.Millisecond) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + _ = localTrack.WriteSample(media.Sample{ + Data: vp8KeyframeStub, + Duration: 50 * time.Millisecond, + }) + } + } + }() + + select { + case <-gotRTP: + t.Logf("pion loopback delivered RTP successfully") + case <-ctx.Done(): + t.Fatalf("%v: timed out after %s", errPionLoopbackNoRTP, 15*time.Second) + } +} diff --git a/internal/e2e/tunnel_test.go b/internal/e2e/tunnel_test.go index 151510a..7f364f7 100644 --- a/internal/e2e/tunnel_test.go +++ b/internal/e2e/tunnel_test.go @@ -29,6 +29,7 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/transport/seichannel" "github.com/openlibrecommunity/olcrtc/internal/transport/videochannel" "github.com/openlibrecommunity/olcrtc/internal/transport/vp8channel" + pioninterceptor "github.com/pion/interceptor" "github.com/pion/webrtc/v4" ) @@ -46,13 +47,14 @@ const ( ) var ( - errRealE2ENotReady = errors.New("real e2e client did not become ready") - errTunnelDidNotStop = errors.New("tunnel goroutine did not stop") - errRealE2EEchoMismatch = errors.New("real e2e echo payload mismatch") - errSocksUnexpectedReply = errors.New("unexpected SOCKS5 reply") - errSocksUnexpectedHello = errors.New("unexpected SOCKS5 greeting") - errPayloadMismatchOffset = errors.New("payload mismatch at offset") - errFailoverCarrier = errors.New("intentional failover carrier failure") + errRealE2ENotReady = errors.New("real e2e client did not become ready") + errTunnelDidNotStop = errors.New("tunnel goroutine did not stop") + errRealE2EEchoMismatch = errors.New("real e2e echo payload mismatch") + errSocksUnexpectedReply = errors.New("unexpected SOCKS5 reply") + errSocksUnexpectedHello = errors.New("unexpected SOCKS5 greeting") + errPayloadMismatchOffset = errors.New("payload mismatch at offset") + errFailoverCarrier = errors.New("intentional failover carrier failure") + errMemoryLoopbackPCClosed = errors.New("memory loopback: PC closed during SDP exchange") errServerExitedBeforeClientStart = errors.New("server exited cleanly before client start") errClientExitedBeforeReady = errors.New("client exited cleanly before ready") @@ -181,6 +183,35 @@ func (r *memoryRoom) triggerEnded(reason string) { } } +// peerOf returns the other stream in a 2-party room or nil if there is +// no peer (yet). Video loopback relies on a single 1:1 partner so we +// just pick the first non-self stream we see. +func (r *memoryRoom) peerOf(s *memoryStream) *memoryStream { + r.mu.Lock() + defer r.mu.Unlock() + for stream := range r.streams { + if stream != s { + return stream + } + } + return nil +} + +// isFirstStream reports whether s is the only stream currently in the +// room. Used to deterministically pick the SDP offerer: the first +// stream to register builds its PC first and becomes the offerer. +func (r *memoryRoom) isFirstStream(s *memoryStream) bool { + r.mu.Lock() + defer r.mu.Unlock() + if len(r.streams) == 0 { + return true + } + if _, ok := r.streams[s]; ok && len(r.streams) == 1 { + return true + } + return false +} + type memoryStream struct { room *memoryRoom onData func([]byte) @@ -193,9 +224,31 @@ type memoryStream struct { track webrtc.TrackLocal trackCB func(*webrtc.TrackRemote, *webrtc.RTPReceiver) pending [][]byte + + // Video-track plumbing. Until AddVideoTrack or + // SetVideoTrackHandler is called the embedded *webrtc.PeerConnection + // stays nil and memoryStream behaves as the original byte-only + // carrier. Once a video transport touches us we lazily build a real + // pion PC; SDP/ICE exchange with the peer stream is plumbed through + // the parent room, so two streams in the same room loopback their + // video tracks through a real (in-process) WebRTC stack. + // + // streamCtx is owned by the stream itself (cancelled in Close), not + // by the short-lived ctx that Connect receives — the video + // transport's connectCtx fires its deferred cancel as soon as + // streamTransport.Connect returns, which would otherwise tear down + // the async SDP negotiation goroutine before it can find its peer. + streamCtx context.Context //nolint:containedctx // owned lifecycle ctx for the loopback PC + streamCancel context.CancelFunc + pcMu sync.Mutex + pc *webrtc.PeerConnection + isOfferer bool + negotiateOnce sync.Once + pendingICE []webrtc.ICECandidateInit + remoteDescSet bool } -func (s *memoryStream) Connect(context.Context) error { +func (s *memoryStream) Connect(_ context.Context) error { s.mu.Lock() s.connected = true pending := s.pending @@ -207,6 +260,17 @@ func (s *memoryStream) Connect(context.Context) error { onData(payload) } } + + // If a video transport already touched us, kick off the offerer-side + // SDP negotiation asynchronously. Connect itself stays fast so that + // room.waitConnected (which only checks that the byte-channel side is + // up) doesn't deadlock waiting for a peer that hasn't joined yet. + s.pcMu.Lock() + needNegotiate := s.pc != nil && s.isOfferer + s.pcMu.Unlock() + if needNegotiate { + go s.negotiateOnce.Do(func() { s.runNegotiation(s.streamCtx) }) + } return nil } @@ -254,6 +318,18 @@ func (s *memoryStream) Close() error { s.closed = true s.connected = false s.mu.Unlock() + + if s.streamCancel != nil { + s.streamCancel() + } + + s.pcMu.Lock() + pc := s.pc + s.pc = nil + s.pcMu.Unlock() + if pc != nil { + _ = pc.Close() + } return nil } @@ -289,6 +365,14 @@ func (s *memoryStream) AddVideoTrack(track webrtc.TrackLocal) error { s.mu.Lock() s.track = track s.mu.Unlock() + + pc, err := s.ensurePC() + if err != nil { + return err + } + if _, err := pc.AddTrack(track); err != nil { + return fmt.Errorf("memory loopback AddTrack: %w", err) + } return nil } @@ -296,6 +380,194 @@ func (s *memoryStream) SetVideoTrackHandler(cb func(*webrtc.TrackRemote, *webrtc s.mu.Lock() s.trackCB = cb s.mu.Unlock() + + // Ensuring the PC up-front lets pion start receiving via the + // transceiver that AddTrack on the peer side will create even if + // SetVideoTrackHandler arrives before AddVideoTrack. + if _, err := s.ensurePC(); err != nil { + // e2e helper: swallow — the failure surfaces on the next + // AddVideoTrack/Connect path that actually needs the PC. + _ = err + } +} + +// ensurePC lazily creates the in-process *webrtc.PeerConnection used to +// loopback video tracks to the peer stream in the same memoryRoom. +// Safe to call from any code path; first caller wins. +func (s *memoryStream) ensurePC() (*webrtc.PeerConnection, error) { + s.pcMu.Lock() + if s.pc != nil { + pc := s.pc + s.pcMu.Unlock() + return pc, nil + } + s.pcMu.Unlock() + + mediaEngine := &webrtc.MediaEngine{} + if err := mediaEngine.RegisterDefaultCodecs(); err != nil { + return nil, fmt.Errorf("memory loopback RegisterDefaultCodecs: %w", err) + } + // Empty interceptor registry: NACK/RR/SR add nothing useful on an + // in-process loopback and just cost CPU. + api := webrtc.NewAPI( + webrtc.WithMediaEngine(mediaEngine), + webrtc.WithInterceptorRegistry(&pioninterceptor.Registry{}), + ) + pc, err := api.NewPeerConnection(webrtc.Configuration{}) + if err != nil { + return nil, fmt.Errorf("memory loopback NewPeerConnection: %w", err) + } + + s.pcMu.Lock() + if s.pc != nil { + // Lost the race: someone else built a PC concurrently. + existing := s.pc + s.pcMu.Unlock() + _ = pc.Close() + return existing, nil + } + s.pc = pc + s.isOfferer = s.room.isFirstStream(s) + s.pcMu.Unlock() + + pc.OnICECandidate(func(c *webrtc.ICECandidate) { + if c == nil { + return + } + peer := s.room.peerOf(s) + if peer == nil { + return + } + peer.acceptICE(c.ToJSON()) + }) + pc.OnTrack(func(remote *webrtc.TrackRemote, recv *webrtc.RTPReceiver) { + s.mu.Lock() + cb := s.trackCB + s.mu.Unlock() + if cb != nil { + cb(remote, recv) + } + }) + return pc, nil +} + +// acceptICE adds a remote ICE candidate to our PC. If the remote SDP +// hasn't been applied yet the candidate is queued and flushed by +// applyRemoteDescription. This matches the standard trickle-ICE +// buffering pattern that pion would otherwise reject with +// ErrPeerConnectionRemoteDescriptionNil. +func (s *memoryStream) acceptICE(c webrtc.ICECandidateInit) { + s.pcMu.Lock() + if s.pc == nil { + s.pcMu.Unlock() + return + } + if !s.remoteDescSet { + s.pendingICE = append(s.pendingICE, c) + s.pcMu.Unlock() + return + } + pc := s.pc + s.pcMu.Unlock() + _ = pc.AddICECandidate(c) +} + +func (s *memoryStream) applyRemoteDescription(sdp webrtc.SessionDescription) error { + s.pcMu.Lock() + pc := s.pc + s.pcMu.Unlock() + if pc == nil { + return fmt.Errorf("%w (remote description)", errMemoryLoopbackPCClosed) + } + if err := pc.SetRemoteDescription(sdp); err != nil { + return fmt.Errorf("memory loopback SetRemoteDescription: %w", err) + } + s.pcMu.Lock() + pending := s.pendingICE + s.pendingICE = nil + s.remoteDescSet = true + s.pcMu.Unlock() + for _, c := range pending { + _ = pc.AddICECandidate(c) + } + return nil +} + +// acceptOffer is the answerer half of the SDP exchange. The offerer +// invokes this synchronously on the peer through the parent room. +func (s *memoryStream) acceptOffer(offer webrtc.SessionDescription) (webrtc.SessionDescription, error) { + if err := s.applyRemoteDescription(offer); err != nil { + return webrtc.SessionDescription{}, err + } + s.pcMu.Lock() + pc := s.pc + s.pcMu.Unlock() + if pc == nil { + return webrtc.SessionDescription{}, fmt.Errorf("%w (answer)", errMemoryLoopbackPCClosed) + } + answer, err := pc.CreateAnswer(nil) + if err != nil { + return webrtc.SessionDescription{}, fmt.Errorf("memory loopback CreateAnswer: %w", err) + } + if err := pc.SetLocalDescription(answer); err != nil { + return webrtc.SessionDescription{}, fmt.Errorf("memory loopback answer SetLocalDescription: %w", err) + } + return answer, nil +} + +// runNegotiation is the offerer-side flow. It waits for the peer stream +// to be connected (so its PC and any local tracks are in place), then +// drives one SDP offer/answer cycle. ICE candidates flow through +// acceptICE in the background. +func (s *memoryStream) runNegotiation(ctx context.Context) { + peer, err := s.waitForPeer(ctx) + if err != nil { + return + } + + s.pcMu.Lock() + pc := s.pc + s.pcMu.Unlock() + if pc == nil { + return + } + + offer, err := pc.CreateOffer(nil) + if err != nil { + return + } + if err := pc.SetLocalDescription(offer); err != nil { + return + } + answer, err := peer.acceptOffer(offer) + if err != nil { + return + } + _ = s.applyRemoteDescription(answer) +} + +// waitForPeer polls the room until a peer stream is both connected +// (its byte-channel Connect has returned) and has built its own PC. +// Polls every 10ms; cheap enough for an in-process loopback. +func (s *memoryStream) waitForPeer(ctx context.Context) (*memoryStream, error) { + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + for { + peer := s.room.peerOf(s) + if peer != nil && peer.isConnected() { + peer.pcMu.Lock() + ready := peer.pc != nil + peer.pcMu.Unlock() + if ready { + return peer, nil + } + } + select { + case <-ctx.Done(): + return nil, fmt.Errorf("memory loopback waitForPeer: %w", ctx.Err()) + case <-ticker.C: + } + } } func (s *memoryStream) isConnected() bool { @@ -331,7 +603,7 @@ func registerMemoryCarrier(t *testing.T) (string, *memoryRoom) { name := "e2e-memory-" + t.Name() room := &memoryRoom{streams: make(map[*memoryStream]struct{})} enginebuiltin.Register(name, func(_ context.Context, cfg enginebuiltin.Config) (engine.Session, error) { - stream := &memoryStream{room: room, onData: cfg.OnData} + stream := newMemoryStream(room, cfg.OnData) room.mu.Lock() room.streams[stream] = struct{}{} room.mu.Unlock() @@ -340,12 +612,25 @@ func registerMemoryCarrier(t *testing.T) (string, *memoryRoom) { return name, room } +// newMemoryStream builds a memoryStream with its own lifecycle context. +// The context lives until Close, so async PC negotiation goroutines see +// it stay alive past streamTransport.Connect's deferred cancel. +func newMemoryStream(room *memoryRoom, onData func([]byte)) *memoryStream { + ctx, cancel := context.WithCancel(context.Background()) + return &memoryStream{ + room: room, + onData: onData, + streamCtx: ctx, + streamCancel: cancel, + } +} + func registerMemoryCarrierAs(t *testing.T, name string) { t.Helper() room := &memoryRoom{streams: make(map[*memoryStream]struct{})} enginebuiltin.Register(name, func(_ context.Context, cfg enginebuiltin.Config) (engine.Session, error) { - stream := &memoryStream{room: room, onData: cfg.OnData} + stream := newMemoryStream(room, cfg.OnData) room.mu.Lock() room.streams[stream] = struct{}{} room.mu.Unlock() @@ -682,11 +967,20 @@ func freeLocalAddr(ctx context.Context, t *testing.T) string { } func waitForReady(t *testing.T, ready <-chan struct{}) { + t.Helper() + waitForReadyWithin(t, ready, 3*time.Second) +} + +// waitForReadyWithin is waitForReady with a caller-chosen budget. Video +// transports (videochannel, seichannel, vp8channel) need a few seconds +// to finish their DTLS+ICE+SDP+smux handshake even on a loopback, so +// callers that exercise those paths pass a longer timeout here. +func waitForReadyWithin(t *testing.T, ready <-chan struct{}, budget time.Duration) { t.Helper() select { case <-ready: - case <-time.After(3 * time.Second): - t.Fatal("client did not become ready") + case <-time.After(budget): + t.Fatalf("client did not become ready within %s", budget) } }