From acac1121a7297b9bf25ea4e2de8b74e58cbf4ed4 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Sat, 16 May 2026 18:46:58 +0300 Subject: [PATCH] fix(jitsi): add epoch-based bridge frame filtering --- internal/engine/jitsi/helpers_test.go | 10 ++ internal/engine/jitsi/jitsi.go | 146 ++++++++++++++++++++++++-- internal/engine/jitsi/jitsi_test.go | 61 +++++++++++ 3 files changed, 209 insertions(+), 8 deletions(-) diff --git a/internal/engine/jitsi/helpers_test.go b/internal/engine/jitsi/helpers_test.go index e908d10..0dde06b 100644 --- a/internal/engine/jitsi/helpers_test.go +++ b/internal/engine/jitsi/helpers_test.go @@ -2,6 +2,7 @@ package jitsi import ( "encoding/base64" + "encoding/binary" "testing" "github.com/zarazaex69/j" @@ -28,8 +29,17 @@ func makeBridgeMessageFrom(from string, fields map[string]any) j.BridgeMessage { } func makeBridgeFrame(t *testing.T, payload []byte) string { + t.Helper() + return makeBridgeFrameForEpoch(t, 0x10203040, 0, payload) +} + +func makeBridgeFrameForEpoch(t *testing.T, senderEpoch, receiverEpoch uint32, payload []byte) string { t.Helper() framed := append([]byte{}, bridgeMagic[:]...) + var hdr [8]byte + binary.BigEndian.PutUint32(hdr[0:4], senderEpoch) + binary.BigEndian.PutUint32(hdr[4:8], receiverEpoch) + framed = append(framed, hdr[:]...) framed = append(framed, payload...) return base64.StdEncoding.EncodeToString(framed) } diff --git a/internal/engine/jitsi/jitsi.go b/internal/engine/jitsi/jitsi.go index 8c99692..1b24110 100644 --- a/internal/engine/jitsi/jitsi.go +++ b/internal/engine/jitsi/jitsi.go @@ -18,7 +18,9 @@ package jitsi import ( "bytes" "context" + "crypto/rand" "encoding/base64" + "encoding/binary" "encoding/xml" "errors" "fmt" @@ -57,6 +59,7 @@ const ( // deadlock the connection. 4 bytes is enough entropy for collision avoidance // against real-world payloads while keeping the overhead negligible. var bridgeMagic = [4]byte{'O', 'L', 'R', '1'} //nolint:gochecknoglobals // protocol constant +var fallbackEpoch atomic.Uint32 //nolint:gochecknoglobals // crypto/rand fallback counter var ( // ErrSessionClosed is returned when an operation is attempted on a closed session. @@ -97,6 +100,8 @@ type Session struct { reconnectCh chan struct{} lastReconnect time.Time reconnectCount int + localEpoch atomic.Uint32 + peerEpoch atomic.Uint32 // peerEndpoint latches the MUC nick of the first occupant whose // EndpointMessage passed the bridgeMagic check. Once set, all bridge @@ -144,7 +149,7 @@ func New(_ context.Context, cfg engine.Config) (engine.Session, error) { } runCtx, cancel := context.WithCancel(context.Background()) - return &Session{ + s := &Session{ host: host, room: room, name: name, @@ -154,7 +159,9 @@ func New(_ context.Context, cfg engine.Config) (engine.Session, error) { done: make(chan struct{}), cancel: cancel, runCtx: runCtx, - }, nil + } + s.localEpoch.Store(randomEpoch()) + return s, nil } // cyrillicToLatin maps Cyrillic runes to their Latin transliteration strings. @@ -228,6 +235,22 @@ func isNickRune(r rune) bool { return false } +func randomEpoch() uint32 { + var b [4]byte + if _, err := rand.Read(b[:]); err != nil { + v := fallbackEpoch.Add(1) + if v == 0 { + return fallbackEpoch.Add(1) + } + return v + } + v := binary.BigEndian.Uint32(b[:]) + if v == 0 { + return 1 + } + return v +} + // Capabilities reports what this engine can do. func (s *Session) Capabilities() engine.Capabilities { return engine.Capabilities{ByteStream: true, VideoTrack: true} @@ -592,12 +615,37 @@ func (s *Session) Send(data []byte) error { if !s.bridgeReady.Load() { return ErrBridgeNotReady } - if len(data)+len(bridgeMagic) > bridgeMaxMessageSize { + framed, err := s.encodeBridgeFrame(data) + if err != nil { + return err + } + return s.enqueueBridgeFrame(framed) +} + +func (s *Session) encodeBridgeFrame(data []byte) ([]byte, error) { + const epochHeaderLen = 8 + if len(data)+len(bridgeMagic)+epochHeaderLen > bridgeMaxMessageSize { + return nil, ErrSendTooLarge + } + framed := make([]byte, len(bridgeMagic)+epochHeaderLen+len(data)) + copy(framed, bridgeMagic[:]) + off := len(bridgeMagic) + binary.BigEndian.PutUint32(framed[off:off+4], s.localEpoch.Load()) + binary.BigEndian.PutUint32(framed[off+4:off+epochHeaderLen], s.peerEpoch.Load()) + copy(framed[off+epochHeaderLen:], data) + return framed, nil +} + +func (s *Session) enqueueBridgeFrame(framed []byte) error { + if s.closed.Load() { + return ErrSessionClosed + } + if !s.bridgeReady.Load() { + return ErrBridgeNotReady + } + if len(framed) > bridgeMaxMessageSize { return ErrSendTooLarge } - framed := make([]byte, len(bridgeMagic)+len(data)) - copy(framed, bridgeMagic[:]) - copy(framed[len(bridgeMagic):], data) select { case s.sendQueue <- framed: return nil @@ -618,10 +666,16 @@ func (s *Session) sendLoop() { if !ok { return } - jSess := s.jSess.Load() + if !s.outboundFrameCurrent(data) { + continue + } + jSess := s.waitJSession() if jSess == nil { return } + if !s.outboundFrameCurrent(data) { + continue + } if err := jSess.BridgeSendRaw("", data); err != nil { if s.closed.Load() { return @@ -632,6 +686,33 @@ func (s *Session) sendLoop() { } } +func (s *Session) waitJSession() *j.Session { + const retryDelay = 10 * time.Millisecond + for { + if s.closed.Load() { + return nil + } + jSess := s.jSess.Load() + if jSess != nil { + return jSess + } + select { + case <-s.done: + return nil + case <-time.After(retryDelay): + } + } +} + +func (s *Session) outboundFrameCurrent(frame []byte) bool { + const epochHeaderLen = 8 + if len(frame) < len(bridgeMagic)+epochHeaderLen { + return false + } + off := len(bridgeMagic) + return binary.BigEndian.Uint32(frame[off:off+4]) == s.localEpoch.Load() +} + func (s *Session) recvLoop() { defer s.wg.Done() @@ -675,10 +756,44 @@ func (s *Session) deliverBridgeMessage(msg j.BridgeMessage, ok bool) bool { if !s.peerLatchAccepts(msg.From) { return true } - s.onData(payload[len(bridgeMagic):]) + data, ok := s.acceptEpochFrame(payload) + if !ok { + return true + } + if len(data) == 0 { + return true + } + s.onData(data) return true } +func (s *Session) acceptEpochFrame(payload []byte) ([]byte, bool) { + const epochHeaderLen = 8 + if len(payload) < len(bridgeMagic)+epochHeaderLen { + return nil, false + } + off := len(bridgeMagic) + senderEpoch := binary.BigEndian.Uint32(payload[off : off+4]) + receiverEpoch := binary.BigEndian.Uint32(payload[off+4 : off+epochHeaderLen]) + if senderEpoch == 0 || senderEpoch == s.localEpoch.Load() { + return nil, false + } + if receiverEpoch != 0 && receiverEpoch != s.localEpoch.Load() { + logger.Debugf("jitsi: drop stale bridge frame peerEpoch=0x%08x localEpoch=0x%08x", + receiverEpoch, s.localEpoch.Load()) + return nil, false + } + if prev := s.peerEpoch.Load(); prev == 0 { + s.peerEpoch.Store(senderEpoch) + } else if prev != senderEpoch { + if s.peerEpoch.CompareAndSwap(prev, senderEpoch) { + s.requestReconnect("jitsi peer epoch changed") + } + return nil, false + } + return payload[off+epochHeaderLen:], true +} + // peerLatchAccepts implements the peer-latch logic: the first sender whose // payload survived the magic check becomes our partner; everyone else is // ignored. Cleared on reconnect by the supervisor (peerEndpoint is reset @@ -879,6 +994,8 @@ func (s *Session) reconnect(ctx context.Context) error { if oldPC != nil { _ = oldPC.Close() } + s.localEpoch.Store(randomEpoch()) + s.drainSendQueue() logger.Infof("jitsi: reconnecting %s/%s as %s ...", s.host, s.room, s.name) jSess, err := s.joinAndOpenBridge(ctx) @@ -889,6 +1006,9 @@ func (s *Session) reconnect(ctx context.Context) error { s.peerEndpoint.Store(nil) s.peerVideoSSRC.Store(0) s.bridgeReady.Store(true) + if err := s.Send(nil); err != nil { + logger.Debugf("jitsi: epoch announce failed: %v", err) + } s.wg.Add(1) go s.recvLoop() @@ -910,6 +1030,16 @@ func (s *Session) drainReconnectQueue() { } } +func (s *Session) drainSendQueue() { + for { + select { + case <-s.sendQueue: + default: + return + } + } +} + // CanSend reports whether the session is ready to accept new data. func (s *Session) CanSend() bool { if s.closed.Load() { diff --git a/internal/engine/jitsi/jitsi_test.go b/internal/engine/jitsi/jitsi_test.go index e07f2ce..fa8aa61 100644 --- a/internal/engine/jitsi/jitsi_test.go +++ b/internal/engine/jitsi/jitsi_test.go @@ -188,6 +188,67 @@ func TestDeliverBridgeMessageMagicAndPeerLatch(t *testing.T) { } } +func TestDeliverBridgeMessageDropsStalePeerEpoch(t *testing.T) { + sess, err := New(context.Background(), engine.Config{ + URL: testHost, + Extra: map[string]string{credentialKeyRoom: testRoom}, + }) + if err != nil { + t.Fatalf("New: %v", err) + } + defer func() { _ = sess.Close() }() + + js, ok := sess.(*Session) + if !ok { + t.Fatal("sess is not *Session") + } + js.localEpoch.Store(0x2222) + delivered := false + js.onData = func([]byte) { delivered = true } + + stale := makeBridgeFrameForEpoch(t, 0x1111, 0xaaaa, []byte("old-smux")) + js.deliverBridgeMessage(makeBridgeMessageFrom("peerA", map[string]any{rawFieldKey: stale}), true) + if delivered { + t.Fatal("stale peer-epoch frame was delivered") + } +} + +func TestDeliverBridgeMessagePeerEpochChangeRequestsReconnect(t *testing.T) { + sess, err := New(context.Background(), engine.Config{ + URL: testHost, + Extra: map[string]string{credentialKeyRoom: testRoom}, + }) + if err != nil { + t.Fatalf("New: %v", err) + } + defer func() { _ = sess.Close() }() + + js, ok := sess.(*Session) + if !ok { + t.Fatal("sess is not *Session") + } + js.localEpoch.Store(0x3333) + js.SetShouldReconnect(func() bool { return true }) + var received [][]byte + js.onData = func(b []byte) { + received = append(received, append([]byte(nil), b...)) + } + + first := makeBridgeFrameForEpoch(t, 0x1111, 0, []byte("first")) + js.deliverBridgeMessage(makeBridgeMessageFrom("peerA", map[string]any{rawFieldKey: first}), true) + changed := makeBridgeFrameForEpoch(t, 0x2222, 0x3333, nil) + js.deliverBridgeMessage(makeBridgeMessageFrom("peerA", map[string]any{rawFieldKey: changed}), true) + + if len(received) != 1 || string(received[0]) != "first" { + t.Fatalf("received = %q, want only first payload", received) + } + select { + case <-js.reconnectCh: + case <-time.After(time.Second): + t.Fatal("peer epoch change did not request reconnect") + } +} + func TestBridgeCloseRequestsReconnect(t *testing.T) { sess, err := New(context.Background(), engine.Config{ URL: testHost,