fix(jitsi): require targeted peer frames for clients

This commit is contained in:
zarazaex69
2026-06-05 18:11:46 +03:00
parent 6b9a69c100
commit 7a6cb7f772
7 changed files with 135 additions and 67 deletions

View File

@@ -175,18 +175,19 @@ func (c *Client) bringUpLink(
cancel context.CancelFunc,
) error {
ln, err := transport.New(ctx, cfg.Transport, transport.Config{
Carrier: cfg.Carrier,
RoomURL: cfg.RoomURL,
Engine: cfg.Engine,
URL: cfg.URL,
Token: cfg.Token,
ChannelID: cfg.ChannelID,
DeviceID: c.deviceID,
Name: names.Generate(),
OnData: c.onData,
DNSServer: cfg.DNSServer,
Options: cfg.TransportOptions,
Traffic: cfg.Traffic,
Carrier: cfg.Carrier,
RoomURL: cfg.RoomURL,
Engine: cfg.Engine,
URL: cfg.URL,
Token: cfg.Token,
ChannelID: cfg.ChannelID,
DeviceID: c.deviceID,
Name: names.Generate(),
OnData: c.onData,
DNSServer: cfg.DNSServer,
RequireTargetedPeer: true,
Options: cfg.TransportOptions,
Traffic: cfg.Traffic,
})
if err != nil {
return fmt.Errorf("failed to create link: %w", err)

View File

@@ -31,13 +31,14 @@ var ErrAuthFailed = errors.New("carrier auth failed")
// Config holds the inputs to [Open]. The fields mirror the subset of
// transport.Config that engines consume.
type Config struct {
RoomURL string
Name string
OnData func([]byte)
OnPeerData func(peerID string, data []byte)
DNSServer string
ProxyAddr string
ProxyPort int
RoomURL string
Name string
OnData func([]byte)
OnPeerData func(peerID string, data []byte)
DNSServer string
ProxyAddr string
ProxyPort int
RequireTargetedPeer bool
// Engine, URL, Token are honoured only for the "none" carrier (direct
// engine access); other carriers derive them from their auth provider.
Engine string
@@ -91,14 +92,15 @@ func registerDirect(name string) {
engineName = "livekit"
}
sess, err := engine.New(ctx, engineName, engine.Config{
URL: cfg.URL,
Token: cfg.Token,
Name: cfg.Name,
OnData: cfg.OnData,
OnPeerData: cfg.OnPeerData,
DNSServer: cfg.DNSServer,
ProxyAddr: cfg.ProxyAddr,
ProxyPort: cfg.ProxyPort,
URL: cfg.URL,
Token: cfg.Token,
Name: cfg.Name,
OnData: cfg.OnData,
OnPeerData: cfg.OnPeerData,
DNSServer: cfg.DNSServer,
ProxyAddr: cfg.ProxyAddr,
ProxyPort: cfg.ProxyPort,
RequireTargetedPeer: cfg.RequireTargetedPeer,
})
if err != nil {
return nil, fmt.Errorf("engine new: %w", err)
@@ -123,15 +125,16 @@ func registerEngineAuth(name string, provider auth.Provider) {
return nil, fmt.Errorf("%w: %w", ErrAuthFailed, err)
}
sess, err := engine.New(ctx, provider.Engine(), engine.Config{
URL: creds.URL,
Token: creds.Token,
Name: cfg.Name,
Extra: creds.Extra,
OnData: cfg.OnData,
OnPeerData: cfg.OnPeerData,
DNSServer: cfg.DNSServer,
ProxyAddr: cfg.ProxyAddr,
ProxyPort: cfg.ProxyPort,
URL: creds.URL,
Token: creds.Token,
Name: cfg.Name,
Extra: creds.Extra,
OnData: cfg.OnData,
OnPeerData: cfg.OnPeerData,
DNSServer: cfg.DNSServer,
ProxyAddr: cfg.ProxyAddr,
ProxyPort: cfg.ProxyPort,
RequireTargetedPeer: cfg.RequireTargetedPeer,
Refresh: func(ctx context.Context) (engine.Credentials, error) {
fresh, err := provider.Issue(ctx, authCfg)
if err != nil {

View File

@@ -57,7 +57,11 @@ type Config struct {
DNSServer string
ProxyAddr string
ProxyPort int
Refresh func(ctx context.Context) (Credentials, error)
// RequireTargetedPeer asks engines that multiplex room-wide messages to
// ignore single-peer broadcast frames until the remote has addressed this
// session's local epoch.
RequireTargetedPeer bool
Refresh func(ctx context.Context) (Credentials, error)
}
// Session is the engine-level runtime handle. It is shaped to match what

View File

@@ -99,11 +99,12 @@ type Session struct {
room string
name string
onData func([]byte)
onPeerData func(peerID string, data []byte)
onReconnect func(*webrtc.DataChannel)
shouldReconnect func() bool
onEnded func(string)
onData func([]byte)
onPeerData func(peerID string, data []byte)
requireTargetedPeer bool
onReconnect func(*webrtc.DataChannel)
shouldReconnect func() bool
onEnded func(string)
jSess atomic.Pointer[j.Session]
@@ -185,18 +186,19 @@ func New(_ context.Context, cfg engine.Config) (engine.Session, error) {
runCtx, cancel := context.WithCancel(context.Background())
s := &Session{
host: host,
room: room,
name: name,
onData: cfg.OnData,
onPeerData: cfg.OnPeerData,
sendQueue: make(chan []byte, defaultSendQueueSize),
peerSendQueue: make(chan bridgeOutbound, defaultSendQueueSize),
peerEpochs: make(map[string]uint32),
reconnectCh: make(chan struct{}, 1),
done: make(chan struct{}),
cancel: cancel,
runCtx: runCtx,
host: host,
room: room,
name: name,
onData: cfg.OnData,
onPeerData: cfg.OnPeerData,
requireTargetedPeer: cfg.RequireTargetedPeer,
sendQueue: make(chan []byte, defaultSendQueueSize),
peerSendQueue: make(chan bridgeOutbound, defaultSendQueueSize),
peerEpochs: make(map[string]uint32),
reconnectCh: make(chan struct{}, 1),
done: make(chan struct{}),
cancel: cancel,
runCtx: runCtx,
}
s.localEpoch.Store(randomEpoch())
return s, nil
@@ -1117,13 +1119,13 @@ func (s *Session) deliverBridgeMessage(msg j.BridgeMessage, ok bool) bool {
if s.onPeerData != nil && msg.From != "" {
return s.deliverPeerBridgePayload(msg.From, payload)
}
if !s.peerLatchAccepts(msg.From) {
return true
}
data, ok := s.acceptEpochFrame(payload)
if !ok {
return true
}
if !s.peerLatchAccepts(msg.From) {
return true
}
if len(data) == 0 {
return true
}
@@ -1192,6 +1194,11 @@ func (s *Session) acceptEpochFrame(payload []byte) ([]byte, bool) {
receiverEpoch, s.localEpoch.Load())
return nil, false
}
if s.requireTargetedPeer && s.onPeerData == nil && receiverEpoch != s.localEpoch.Load() {
logger.Debugf("jitsi: drop untargeted bridge frame senderEpoch=0x%08x localEpoch=0x%08x",
senderEpoch, s.localEpoch.Load())
return nil, false
}
// Update the peer-epoch latch and ALWAYS accept the frame.
//
// Earlier revisions reconnected ourselves whenever the peer's epoch

View File

@@ -320,6 +320,52 @@ func TestReconnectEpochAnnounceWithZeroPeerEpochIsAccepted(t *testing.T) {
}
}
func TestRequireTargetedPeerDropsOtherClientBroadcastBeforeLatch(t *testing.T) {
var received [][]byte
sess, err := New(context.Background(), engine.Config{
URL: testHost,
Extra: map[string]string{credentialKeyRoom: testRoom},
RequireTargetedPeer: true,
OnData: func(b []byte) {
received = append(received, append([]byte(nil), b...))
},
})
if err != nil {
t.Fatalf("New: %v", err)
}
defer func() { _ = sess.Close() }()
js, ok := sess.(*Session)
if !ok {
t.Fatal("sess is not *Session")
}
js.localEpoch.Store(0x3333)
otherClient := makeBridgeFrameForEpoch(t, 0x2222, 0, []byte("CLIENT_HELLO"))
js.deliverBridgeMessage(makeBridgeMessageFrom("clientB", map[string]any{rawFieldKey: otherClient}), true)
if len(received) != 0 {
t.Fatalf("received other client broadcast = %q, want none", received)
}
if got := js.peerEpoch.Load(); got != 0 {
t.Fatalf("peerEpoch after other client broadcast = 0x%08x, want 0", got)
}
if got := js.peerEndpoint.Load(); got != nil {
t.Fatalf("peerEndpoint after other client broadcast = %q, want nil", *got)
}
serverWelcome := makeBridgeFrameForEpoch(t, 0x1111, 0x3333, []byte("SERVER_WELCOME"))
js.deliverBridgeMessage(makeBridgeMessageFrom("server", map[string]any{rawFieldKey: serverWelcome}), true)
if len(received) != 1 || string(received[0]) != "SERVER_WELCOME" {
t.Fatalf("received = %q, want targeted server welcome", received)
}
if got := js.peerEpoch.Load(); got != 0x1111 {
t.Fatalf("peerEpoch = 0x%08x, want server epoch", got)
}
if got := js.peerEndpoint.Load(); got == nil || *got != "server" {
t.Fatalf("peerEndpoint = %v, want server", got)
}
}
// TestDeliverBridgeMessagePeerEpochChangeAcceptsFrameNoReconnect codifies
// the post-fix behaviour: when a peer's epoch flips (because the peer
// reconnected), we update our latch and ACCEPT the new frame instead of

View File

@@ -24,16 +24,17 @@ type streamTransport struct {
// New creates a datachannel transport backed by a carrier engine.
func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) {
sess, err := enginebuiltin.Open(ctx, cfg.Carrier, enginebuiltin.Config{
RoomURL: cfg.RoomURL,
Name: cfg.Name,
OnData: cfg.OnData,
OnPeerData: cfg.OnPeerData,
DNSServer: cfg.DNSServer,
ProxyAddr: cfg.ProxyAddr,
ProxyPort: cfg.ProxyPort,
Engine: cfg.Engine,
URL: cfg.URL,
Token: cfg.Token,
RoomURL: cfg.RoomURL,
Name: cfg.Name,
OnData: cfg.OnData,
OnPeerData: cfg.OnPeerData,
DNSServer: cfg.DNSServer,
ProxyAddr: cfg.ProxyAddr,
ProxyPort: cfg.ProxyPort,
Engine: cfg.Engine,
URL: cfg.URL,
Token: cfg.Token,
RequireTargetedPeer: cfg.RequireTargetedPeer,
})
if err != nil {
return nil, fmt.Errorf("open engine session: %w", err)

View File

@@ -88,6 +88,12 @@ type Config struct {
ProxyAddr string
ProxyPort int
// RequireTargetedPeer makes single-peer engines ignore broadcast frames
// from unrelated olcrtc clients until a peer sends a frame addressed to
// this session's local epoch. Server-side transports leave this disabled
// so they can accept initial broadcast CLIENT_HELLO frames.
RequireTargetedPeer bool
// Options carries transport-specific tuning. Type is per-transport-package.
Options Options