fix(jitsi): isolate bridge and video to one peer

This commit is contained in:
zarazaex69
2026-05-16 05:29:51 +03:00
parent a636236523
commit d80d725d5e
3 changed files with 125 additions and 3 deletions

View File

@@ -18,3 +18,18 @@ func makeBridgeMessage(class string, fields map[string]any) j.BridgeMessage {
Fields: fields,
}
}
func makeBridgeMessageFrom(class, from string, fields map[string]any) j.BridgeMessage {
return j.BridgeMessage{
Class: class,
From: from,
Fields: fields,
}
}
func makeBridgeFrame(t *testing.T, payload []byte) string {
t.Helper()
framed := append([]byte{}, bridgeMagic[:]...)
framed = append(framed, payload...)
return base64.StdEncoding.EncodeToString(framed)
}

View File

@@ -16,6 +16,7 @@
package jitsi
import (
"bytes"
"context"
"encoding/base64"
"encoding/xml"
@@ -46,6 +47,15 @@ const (
videoTrackName = "videochannel"
)
// bridgeMagic tags every EndpointMessage produced by this engine. JVB broadcasts
// EndpointMessage payloads to every occupant of the MUC; the magic lets the
// receiver discard frames from unrelated applications (or unrelated olcrtc
// processes sharing the same room) before they reach the byte-stream layer.
// Without it, a stray peer's smux/handshake bytes parse as our protocol and
// deadlock the connection. 4 bytes is enough entropy for collision avoidance
// against real-world payloads while keeping the overhead negligible.
var bridgeMagic = [4]byte{'O', 'L', 'R', '1'} //nolint:gochecknoglobals // protocol constant
var (
// ErrSessionClosed is returned when an operation is attempted on a closed session.
ErrSessionClosed = errors.New("jitsi session closed")
@@ -80,6 +90,12 @@ type Session struct {
sendQueue chan []byte
bridgeReady atomic.Bool
closed atomic.Bool
// peerEndpoint latches the MUC nick of the first occupant whose
// EndpointMessage passed the bridgeMagic check. Once set, all bridge
// messages from other senders are dropped, isolating us from chatter by
// unrelated olcrtc processes that happen to share the same room.
peerEndpoint atomic.Pointer[string]
done chan struct{}
doneOnce sync.Once
cancel context.CancelFunc
@@ -89,6 +105,13 @@ type Session struct {
videoTrackMu sync.RWMutex
videoTracks []webrtc.TrackLocal
onVideoTrack func(*webrtc.TrackRemote, *webrtc.RTPReceiver)
// peerVideoSSRC latches the SSRC of the first remote video track we
// surfaced to the carrier. JVB forwards every active video source in
// the MUC as a separate TrackRemote; without this latch a third
// participant's video confuses the vp8channel epoch/CRC machinery on
// the receiver side. Once set, additional video tracks are drained.
peerVideoSSRC atomic.Uint32
}
// New creates a new Jitsi engine session.
@@ -230,6 +253,10 @@ func (s *Session) Connect(ctx context.Context) error {
if err != nil {
return fmt.Errorf("open bridge: %w", err)
}
// Re-latch peer on every bridge open: after a reconnect the partner's
// MUC nick may have changed.
s.peerEndpoint.Store(nil)
s.peerVideoSSRC.Store(0)
s.bridgeReady.Store(true)
logger.Infof("jitsi: bridge open (endpoints=%v)", jSess.Endpoints())
}
@@ -252,6 +279,18 @@ func (s *Session) shouldNegotiatePC() bool {
return len(s.videoTracks) > 0 || s.onVideoTrack != nil
}
// drainTrack reads and discards RTP from a TrackRemote we chose to ignore so
// pion's per-track receiver buffer doesn't fill up. Returns when the track
// closes.
func drainTrack(track *webrtc.TrackRemote) {
buf := make([]byte, 1500)
for {
if _, _, err := track.Read(buf); err != nil {
return
}
}
}
func (s *Session) videoTrackHandler() func(*webrtc.TrackRemote, *webrtc.RTPReceiver) {
s.videoTrackMu.RLock()
defer s.videoTrackMu.RUnlock()
@@ -337,6 +376,13 @@ func (s *Session) negotiatePC(ctx context.Context, jSess *j.Session) error {
if track.Kind() != webrtc.RTPCodecTypeVideo {
return
}
ssrc := uint32(track.SSRC())
if !s.peerVideoSSRC.CompareAndSwap(0, ssrc) && s.peerVideoSSRC.Load() != ssrc {
// A different remote participant: drain the track so pion's
// receiver buffer doesn't fill up and back-pressure the SFU.
go drainTrack(track)
return
}
if cb := s.videoTrackHandler(); cb != nil {
cb(track, recv)
}
@@ -528,11 +574,14 @@ func (s *Session) Send(data []byte) error {
if !s.bridgeReady.Load() {
return ErrBridgeNotReady
}
if len(data) > bridgeMaxMessageSize {
if len(data)+len(bridgeMagic) > bridgeMaxMessageSize {
return ErrSendTooLarge
}
framed := make([]byte, len(bridgeMagic)+len(data))
copy(framed, bridgeMagic[:])
copy(framed[len(bridgeMagic):], data)
select {
case s.sendQueue <- data:
case s.sendQueue <- framed:
return nil
case <-s.done:
return ErrSessionClosed
@@ -602,7 +651,26 @@ func (s *Session) deliverBridgeMessage(msg j.BridgeMessage, ok bool) bool {
if payload == nil {
return true
}
s.onData(payload)
if len(payload) < len(bridgeMagic) || !bytes.Equal(payload[:len(bridgeMagic)], bridgeMagic[:]) {
return true
}
// peer-latch: the first sender whose payload survived the magic check
// becomes our partner; everyone else is ignored. Cleared on reconnect by
// the supervisor (peerEndpoint is reset whenever the bridge is reopened).
if cur := s.peerEndpoint.Load(); cur != nil {
if *cur != msg.From {
return true
}
} else if msg.From != "" {
from := msg.From
s.peerEndpoint.CompareAndSwap(nil, &from)
// Re-check after CAS: a concurrent latch may have picked a different
// peer first; if so, drop this frame.
if cur := s.peerEndpoint.Load(); cur != nil && *cur != msg.From {
return true
}
}
s.onData(payload[len(bridgeMagic):])
return true
}

View File

@@ -144,6 +144,45 @@ func TestSanitiseNick(t *testing.T) {
}
}
func TestDeliverBridgeMessageMagicAndPeerLatch(t *testing.T) {
sess, err := New(context.Background(), engine.Config{
URL: testHost,
Extra: map[string]string{credentialKeyRoom: testRoom},
})
if err != nil {
t.Fatalf("New: %v", err)
}
defer func() { _ = sess.Close() }()
js := sess.(*Session)
var received [][]byte
js.onData = func(b []byte) {
received = append(received, append([]byte(nil), b...))
}
good := makeBridgeFrame(t, []byte("alpha"))
bad := encodeForTest(t, []byte("alpha")) // no magic prefix
// First valid frame from peerA latches the peer and is delivered.
if !js.deliverBridgeMessage(makeBridgeMessageFrom(classEndpoint, "peerA", map[string]any{rawFieldKey: good}), true) {
t.Fatal("deliverBridgeMessage returned false on valid frame")
}
// Frame without magic is dropped.
js.deliverBridgeMessage(makeBridgeMessageFrom(classEndpoint, "peerA", map[string]any{rawFieldKey: bad}), true)
// Frame from a different sender after latch is dropped even with magic.
js.deliverBridgeMessage(makeBridgeMessageFrom(classEndpoint, "peerB", map[string]any{rawFieldKey: good}), true)
// Another frame from latched peer still flows.
beta := makeBridgeFrame(t, []byte("beta"))
js.deliverBridgeMessage(makeBridgeMessageFrom(classEndpoint, "peerA", map[string]any{rawFieldKey: beta}), true)
if len(received) != 2 {
t.Fatalf("received frames = %d, want 2 (%q)", len(received), received)
}
if string(received[0]) != "alpha" || string(received[1]) != "beta" {
t.Fatalf("received = %q, want [alpha beta]", received)
}
}
func TestEngineRegistration(t *testing.T) {
if _, err := engine.New(context.Background(), "jitsi", engine.Config{
URL: testHost,