diff --git a/internal/engine/jitsi/helpers_test.go b/internal/engine/jitsi/helpers_test.go index 55fd823..c5ee6b1 100644 --- a/internal/engine/jitsi/helpers_test.go +++ b/internal/engine/jitsi/helpers_test.go @@ -18,3 +18,18 @@ func makeBridgeMessage(class string, fields map[string]any) j.BridgeMessage { Fields: fields, } } + +func makeBridgeMessageFrom(class, from string, fields map[string]any) j.BridgeMessage { + return j.BridgeMessage{ + Class: class, + From: from, + Fields: fields, + } +} + +func makeBridgeFrame(t *testing.T, payload []byte) string { + t.Helper() + framed := append([]byte{}, bridgeMagic[:]...) + framed = append(framed, payload...) + return base64.StdEncoding.EncodeToString(framed) +} diff --git a/internal/engine/jitsi/jitsi.go b/internal/engine/jitsi/jitsi.go index 7c9cdaf..bdb10f0 100644 --- a/internal/engine/jitsi/jitsi.go +++ b/internal/engine/jitsi/jitsi.go @@ -16,6 +16,7 @@ package jitsi import ( + "bytes" "context" "encoding/base64" "encoding/xml" @@ -46,6 +47,15 @@ const ( videoTrackName = "videochannel" ) +// bridgeMagic tags every EndpointMessage produced by this engine. JVB broadcasts +// EndpointMessage payloads to every occupant of the MUC; the magic lets the +// receiver discard frames from unrelated applications (or unrelated olcrtc +// processes sharing the same room) before they reach the byte-stream layer. +// Without it, a stray peer's smux/handshake bytes parse as our protocol and +// deadlock the connection. 4 bytes is enough entropy for collision avoidance +// against real-world payloads while keeping the overhead negligible. +var bridgeMagic = [4]byte{'O', 'L', 'R', '1'} //nolint:gochecknoglobals // protocol constant + var ( // ErrSessionClosed is returned when an operation is attempted on a closed session. ErrSessionClosed = errors.New("jitsi session closed") @@ -80,6 +90,12 @@ type Session struct { sendQueue chan []byte bridgeReady atomic.Bool closed atomic.Bool + + // peerEndpoint latches the MUC nick of the first occupant whose + // EndpointMessage passed the bridgeMagic check. Once set, all bridge + // messages from other senders are dropped, isolating us from chatter by + // unrelated olcrtc processes that happen to share the same room. + peerEndpoint atomic.Pointer[string] done chan struct{} doneOnce sync.Once cancel context.CancelFunc @@ -89,6 +105,13 @@ type Session struct { videoTrackMu sync.RWMutex videoTracks []webrtc.TrackLocal onVideoTrack func(*webrtc.TrackRemote, *webrtc.RTPReceiver) + + // peerVideoSSRC latches the SSRC of the first remote video track we + // surfaced to the carrier. JVB forwards every active video source in + // the MUC as a separate TrackRemote; without this latch a third + // participant's video confuses the vp8channel epoch/CRC machinery on + // the receiver side. Once set, additional video tracks are drained. + peerVideoSSRC atomic.Uint32 } // New creates a new Jitsi engine session. @@ -230,6 +253,10 @@ func (s *Session) Connect(ctx context.Context) error { if err != nil { return fmt.Errorf("open bridge: %w", err) } + // Re-latch peer on every bridge open: after a reconnect the partner's + // MUC nick may have changed. + s.peerEndpoint.Store(nil) + s.peerVideoSSRC.Store(0) s.bridgeReady.Store(true) logger.Infof("jitsi: bridge open (endpoints=%v)", jSess.Endpoints()) } @@ -252,6 +279,18 @@ func (s *Session) shouldNegotiatePC() bool { return len(s.videoTracks) > 0 || s.onVideoTrack != nil } +// drainTrack reads and discards RTP from a TrackRemote we chose to ignore so +// pion's per-track receiver buffer doesn't fill up. Returns when the track +// closes. +func drainTrack(track *webrtc.TrackRemote) { + buf := make([]byte, 1500) + for { + if _, _, err := track.Read(buf); err != nil { + return + } + } +} + func (s *Session) videoTrackHandler() func(*webrtc.TrackRemote, *webrtc.RTPReceiver) { s.videoTrackMu.RLock() defer s.videoTrackMu.RUnlock() @@ -337,6 +376,13 @@ func (s *Session) negotiatePC(ctx context.Context, jSess *j.Session) error { if track.Kind() != webrtc.RTPCodecTypeVideo { return } + ssrc := uint32(track.SSRC()) + if !s.peerVideoSSRC.CompareAndSwap(0, ssrc) && s.peerVideoSSRC.Load() != ssrc { + // A different remote participant: drain the track so pion's + // receiver buffer doesn't fill up and back-pressure the SFU. + go drainTrack(track) + return + } if cb := s.videoTrackHandler(); cb != nil { cb(track, recv) } @@ -528,11 +574,14 @@ func (s *Session) Send(data []byte) error { if !s.bridgeReady.Load() { return ErrBridgeNotReady } - if len(data) > bridgeMaxMessageSize { + if len(data)+len(bridgeMagic) > bridgeMaxMessageSize { return ErrSendTooLarge } + framed := make([]byte, len(bridgeMagic)+len(data)) + copy(framed, bridgeMagic[:]) + copy(framed[len(bridgeMagic):], data) select { - case s.sendQueue <- data: + case s.sendQueue <- framed: return nil case <-s.done: return ErrSessionClosed @@ -602,7 +651,26 @@ func (s *Session) deliverBridgeMessage(msg j.BridgeMessage, ok bool) bool { if payload == nil { return true } - s.onData(payload) + if len(payload) < len(bridgeMagic) || !bytes.Equal(payload[:len(bridgeMagic)], bridgeMagic[:]) { + return true + } + // peer-latch: the first sender whose payload survived the magic check + // becomes our partner; everyone else is ignored. Cleared on reconnect by + // the supervisor (peerEndpoint is reset whenever the bridge is reopened). + if cur := s.peerEndpoint.Load(); cur != nil { + if *cur != msg.From { + return true + } + } else if msg.From != "" { + from := msg.From + s.peerEndpoint.CompareAndSwap(nil, &from) + // Re-check after CAS: a concurrent latch may have picked a different + // peer first; if so, drop this frame. + if cur := s.peerEndpoint.Load(); cur != nil && *cur != msg.From { + return true + } + } + s.onData(payload[len(bridgeMagic):]) return true } diff --git a/internal/engine/jitsi/jitsi_test.go b/internal/engine/jitsi/jitsi_test.go index 0990930..4a43049 100644 --- a/internal/engine/jitsi/jitsi_test.go +++ b/internal/engine/jitsi/jitsi_test.go @@ -144,6 +144,45 @@ func TestSanitiseNick(t *testing.T) { } } +func TestDeliverBridgeMessageMagicAndPeerLatch(t *testing.T) { + sess, err := New(context.Background(), engine.Config{ + URL: testHost, + Extra: map[string]string{credentialKeyRoom: testRoom}, + }) + if err != nil { + t.Fatalf("New: %v", err) + } + defer func() { _ = sess.Close() }() + + js := sess.(*Session) + var received [][]byte + js.onData = func(b []byte) { + received = append(received, append([]byte(nil), b...)) + } + + good := makeBridgeFrame(t, []byte("alpha")) + bad := encodeForTest(t, []byte("alpha")) // no magic prefix + + // First valid frame from peerA latches the peer and is delivered. + if !js.deliverBridgeMessage(makeBridgeMessageFrom(classEndpoint, "peerA", map[string]any{rawFieldKey: good}), true) { + t.Fatal("deliverBridgeMessage returned false on valid frame") + } + // Frame without magic is dropped. + js.deliverBridgeMessage(makeBridgeMessageFrom(classEndpoint, "peerA", map[string]any{rawFieldKey: bad}), true) + // Frame from a different sender after latch is dropped even with magic. + js.deliverBridgeMessage(makeBridgeMessageFrom(classEndpoint, "peerB", map[string]any{rawFieldKey: good}), true) + // Another frame from latched peer still flows. + beta := makeBridgeFrame(t, []byte("beta")) + js.deliverBridgeMessage(makeBridgeMessageFrom(classEndpoint, "peerA", map[string]any{rawFieldKey: beta}), true) + + if len(received) != 2 { + t.Fatalf("received frames = %d, want 2 (%q)", len(received), received) + } + if string(received[0]) != "alpha" || string(received[1]) != "beta" { + t.Fatalf("received = %q, want [alpha beta]", received) + } +} + func TestEngineRegistration(t *testing.T) { if _, err := engine.New(context.Background(), "jitsi", engine.Config{ URL: testHost,