fix(jitsi): add epoch-based bridge frame filtering

This commit is contained in:
zarazaex69
2026-05-16 18:46:58 +03:00
parent 07b86a7559
commit acac1121a7
3 changed files with 209 additions and 8 deletions

View File

@@ -2,6 +2,7 @@ package jitsi
import (
"encoding/base64"
"encoding/binary"
"testing"
"github.com/zarazaex69/j"
@@ -28,8 +29,17 @@ func makeBridgeMessageFrom(from string, fields map[string]any) j.BridgeMessage {
}
func makeBridgeFrame(t *testing.T, payload []byte) string {
t.Helper()
return makeBridgeFrameForEpoch(t, 0x10203040, 0, payload)
}
func makeBridgeFrameForEpoch(t *testing.T, senderEpoch, receiverEpoch uint32, payload []byte) string {
t.Helper()
framed := append([]byte{}, bridgeMagic[:]...)
var hdr [8]byte
binary.BigEndian.PutUint32(hdr[0:4], senderEpoch)
binary.BigEndian.PutUint32(hdr[4:8], receiverEpoch)
framed = append(framed, hdr[:]...)
framed = append(framed, payload...)
return base64.StdEncoding.EncodeToString(framed)
}

View File

@@ -18,7 +18,9 @@ package jitsi
import (
"bytes"
"context"
"crypto/rand"
"encoding/base64"
"encoding/binary"
"encoding/xml"
"errors"
"fmt"
@@ -57,6 +59,7 @@ const (
// deadlock the connection. 4 bytes is enough entropy for collision avoidance
// against real-world payloads while keeping the overhead negligible.
var bridgeMagic = [4]byte{'O', 'L', 'R', '1'} //nolint:gochecknoglobals // protocol constant
var fallbackEpoch atomic.Uint32 //nolint:gochecknoglobals // crypto/rand fallback counter
var (
// ErrSessionClosed is returned when an operation is attempted on a closed session.
@@ -97,6 +100,8 @@ type Session struct {
reconnectCh chan struct{}
lastReconnect time.Time
reconnectCount int
localEpoch atomic.Uint32
peerEpoch atomic.Uint32
// peerEndpoint latches the MUC nick of the first occupant whose
// EndpointMessage passed the bridgeMagic check. Once set, all bridge
@@ -144,7 +149,7 @@ func New(_ context.Context, cfg engine.Config) (engine.Session, error) {
}
runCtx, cancel := context.WithCancel(context.Background())
return &Session{
s := &Session{
host: host,
room: room,
name: name,
@@ -154,7 +159,9 @@ func New(_ context.Context, cfg engine.Config) (engine.Session, error) {
done: make(chan struct{}),
cancel: cancel,
runCtx: runCtx,
}, nil
}
s.localEpoch.Store(randomEpoch())
return s, nil
}
// cyrillicToLatin maps Cyrillic runes to their Latin transliteration strings.
@@ -228,6 +235,22 @@ func isNickRune(r rune) bool {
return false
}
func randomEpoch() uint32 {
var b [4]byte
if _, err := rand.Read(b[:]); err != nil {
v := fallbackEpoch.Add(1)
if v == 0 {
return fallbackEpoch.Add(1)
}
return v
}
v := binary.BigEndian.Uint32(b[:])
if v == 0 {
return 1
}
return v
}
// Capabilities reports what this engine can do.
func (s *Session) Capabilities() engine.Capabilities {
return engine.Capabilities{ByteStream: true, VideoTrack: true}
@@ -592,12 +615,37 @@ func (s *Session) Send(data []byte) error {
if !s.bridgeReady.Load() {
return ErrBridgeNotReady
}
if len(data)+len(bridgeMagic) > bridgeMaxMessageSize {
framed, err := s.encodeBridgeFrame(data)
if err != nil {
return err
}
return s.enqueueBridgeFrame(framed)
}
func (s *Session) encodeBridgeFrame(data []byte) ([]byte, error) {
const epochHeaderLen = 8
if len(data)+len(bridgeMagic)+epochHeaderLen > bridgeMaxMessageSize {
return nil, ErrSendTooLarge
}
framed := make([]byte, len(bridgeMagic)+epochHeaderLen+len(data))
copy(framed, bridgeMagic[:])
off := len(bridgeMagic)
binary.BigEndian.PutUint32(framed[off:off+4], s.localEpoch.Load())
binary.BigEndian.PutUint32(framed[off+4:off+epochHeaderLen], s.peerEpoch.Load())
copy(framed[off+epochHeaderLen:], data)
return framed, nil
}
func (s *Session) enqueueBridgeFrame(framed []byte) error {
if s.closed.Load() {
return ErrSessionClosed
}
if !s.bridgeReady.Load() {
return ErrBridgeNotReady
}
if len(framed) > bridgeMaxMessageSize {
return ErrSendTooLarge
}
framed := make([]byte, len(bridgeMagic)+len(data))
copy(framed, bridgeMagic[:])
copy(framed[len(bridgeMagic):], data)
select {
case s.sendQueue <- framed:
return nil
@@ -618,10 +666,16 @@ func (s *Session) sendLoop() {
if !ok {
return
}
jSess := s.jSess.Load()
if !s.outboundFrameCurrent(data) {
continue
}
jSess := s.waitJSession()
if jSess == nil {
return
}
if !s.outboundFrameCurrent(data) {
continue
}
if err := jSess.BridgeSendRaw("", data); err != nil {
if s.closed.Load() {
return
@@ -632,6 +686,33 @@ func (s *Session) sendLoop() {
}
}
func (s *Session) waitJSession() *j.Session {
const retryDelay = 10 * time.Millisecond
for {
if s.closed.Load() {
return nil
}
jSess := s.jSess.Load()
if jSess != nil {
return jSess
}
select {
case <-s.done:
return nil
case <-time.After(retryDelay):
}
}
}
func (s *Session) outboundFrameCurrent(frame []byte) bool {
const epochHeaderLen = 8
if len(frame) < len(bridgeMagic)+epochHeaderLen {
return false
}
off := len(bridgeMagic)
return binary.BigEndian.Uint32(frame[off:off+4]) == s.localEpoch.Load()
}
func (s *Session) recvLoop() {
defer s.wg.Done()
@@ -675,10 +756,44 @@ func (s *Session) deliverBridgeMessage(msg j.BridgeMessage, ok bool) bool {
if !s.peerLatchAccepts(msg.From) {
return true
}
s.onData(payload[len(bridgeMagic):])
data, ok := s.acceptEpochFrame(payload)
if !ok {
return true
}
if len(data) == 0 {
return true
}
s.onData(data)
return true
}
func (s *Session) acceptEpochFrame(payload []byte) ([]byte, bool) {
const epochHeaderLen = 8
if len(payload) < len(bridgeMagic)+epochHeaderLen {
return nil, false
}
off := len(bridgeMagic)
senderEpoch := binary.BigEndian.Uint32(payload[off : off+4])
receiverEpoch := binary.BigEndian.Uint32(payload[off+4 : off+epochHeaderLen])
if senderEpoch == 0 || senderEpoch == s.localEpoch.Load() {
return nil, false
}
if receiverEpoch != 0 && receiverEpoch != s.localEpoch.Load() {
logger.Debugf("jitsi: drop stale bridge frame peerEpoch=0x%08x localEpoch=0x%08x",
receiverEpoch, s.localEpoch.Load())
return nil, false
}
if prev := s.peerEpoch.Load(); prev == 0 {
s.peerEpoch.Store(senderEpoch)
} else if prev != senderEpoch {
if s.peerEpoch.CompareAndSwap(prev, senderEpoch) {
s.requestReconnect("jitsi peer epoch changed")
}
return nil, false
}
return payload[off+epochHeaderLen:], true
}
// peerLatchAccepts implements the peer-latch logic: the first sender whose
// payload survived the magic check becomes our partner; everyone else is
// ignored. Cleared on reconnect by the supervisor (peerEndpoint is reset
@@ -879,6 +994,8 @@ func (s *Session) reconnect(ctx context.Context) error {
if oldPC != nil {
_ = oldPC.Close()
}
s.localEpoch.Store(randomEpoch())
s.drainSendQueue()
logger.Infof("jitsi: reconnecting %s/%s as %s ...", s.host, s.room, s.name)
jSess, err := s.joinAndOpenBridge(ctx)
@@ -889,6 +1006,9 @@ func (s *Session) reconnect(ctx context.Context) error {
s.peerEndpoint.Store(nil)
s.peerVideoSSRC.Store(0)
s.bridgeReady.Store(true)
if err := s.Send(nil); err != nil {
logger.Debugf("jitsi: epoch announce failed: %v", err)
}
s.wg.Add(1)
go s.recvLoop()
@@ -910,6 +1030,16 @@ func (s *Session) drainReconnectQueue() {
}
}
func (s *Session) drainSendQueue() {
for {
select {
case <-s.sendQueue:
default:
return
}
}
}
// CanSend reports whether the session is ready to accept new data.
func (s *Session) CanSend() bool {
if s.closed.Load() {

View File

@@ -188,6 +188,67 @@ func TestDeliverBridgeMessageMagicAndPeerLatch(t *testing.T) {
}
}
func TestDeliverBridgeMessageDropsStalePeerEpoch(t *testing.T) {
sess, err := New(context.Background(), engine.Config{
URL: testHost,
Extra: map[string]string{credentialKeyRoom: testRoom},
})
if err != nil {
t.Fatalf("New: %v", err)
}
defer func() { _ = sess.Close() }()
js, ok := sess.(*Session)
if !ok {
t.Fatal("sess is not *Session")
}
js.localEpoch.Store(0x2222)
delivered := false
js.onData = func([]byte) { delivered = true }
stale := makeBridgeFrameForEpoch(t, 0x1111, 0xaaaa, []byte("old-smux"))
js.deliverBridgeMessage(makeBridgeMessageFrom("peerA", map[string]any{rawFieldKey: stale}), true)
if delivered {
t.Fatal("stale peer-epoch frame was delivered")
}
}
func TestDeliverBridgeMessagePeerEpochChangeRequestsReconnect(t *testing.T) {
sess, err := New(context.Background(), engine.Config{
URL: testHost,
Extra: map[string]string{credentialKeyRoom: testRoom},
})
if err != nil {
t.Fatalf("New: %v", err)
}
defer func() { _ = sess.Close() }()
js, ok := sess.(*Session)
if !ok {
t.Fatal("sess is not *Session")
}
js.localEpoch.Store(0x3333)
js.SetShouldReconnect(func() bool { return true })
var received [][]byte
js.onData = func(b []byte) {
received = append(received, append([]byte(nil), b...))
}
first := makeBridgeFrameForEpoch(t, 0x1111, 0, []byte("first"))
js.deliverBridgeMessage(makeBridgeMessageFrom("peerA", map[string]any{rawFieldKey: first}), true)
changed := makeBridgeFrameForEpoch(t, 0x2222, 0x3333, nil)
js.deliverBridgeMessage(makeBridgeMessageFrom("peerA", map[string]any{rawFieldKey: changed}), true)
if len(received) != 1 || string(received[0]) != "first" {
t.Fatalf("received = %q, want only first payload", received)
}
select {
case <-js.reconnectCh:
case <-time.After(time.Second):
t.Fatal("peer epoch change did not request reconnect")
}
}
func TestBridgeCloseRequestsReconnect(t *testing.T) {
sess, err := New(context.Background(), engine.Config{
URL: testHost,