diff --git a/internal/client/client.go b/internal/client/client.go index fc27084..0d2bca0 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -175,18 +175,19 @@ func (c *Client) bringUpLink( cancel context.CancelFunc, ) error { ln, err := transport.New(ctx, cfg.Transport, transport.Config{ - Carrier: cfg.Carrier, - RoomURL: cfg.RoomURL, - Engine: cfg.Engine, - URL: cfg.URL, - Token: cfg.Token, - ChannelID: cfg.ChannelID, - DeviceID: c.deviceID, - Name: names.Generate(), - OnData: c.onData, - DNSServer: cfg.DNSServer, - Options: cfg.TransportOptions, - Traffic: cfg.Traffic, + Carrier: cfg.Carrier, + RoomURL: cfg.RoomURL, + Engine: cfg.Engine, + URL: cfg.URL, + Token: cfg.Token, + ChannelID: cfg.ChannelID, + DeviceID: c.deviceID, + Name: names.Generate(), + OnData: c.onData, + DNSServer: cfg.DNSServer, + RequireTargetedPeer: true, + Options: cfg.TransportOptions, + Traffic: cfg.Traffic, }) if err != nil { return fmt.Errorf("failed to create link: %w", err) diff --git a/internal/engine/builtin/builtin.go b/internal/engine/builtin/builtin.go index da52506..5deb9bb 100644 --- a/internal/engine/builtin/builtin.go +++ b/internal/engine/builtin/builtin.go @@ -31,13 +31,14 @@ var ErrAuthFailed = errors.New("carrier auth failed") // Config holds the inputs to [Open]. The fields mirror the subset of // transport.Config that engines consume. type Config struct { - RoomURL string - Name string - OnData func([]byte) - OnPeerData func(peerID string, data []byte) - DNSServer string - ProxyAddr string - ProxyPort int + RoomURL string + Name string + OnData func([]byte) + OnPeerData func(peerID string, data []byte) + DNSServer string + ProxyAddr string + ProxyPort int + RequireTargetedPeer bool // Engine, URL, Token are honoured only for the "none" carrier (direct // engine access); other carriers derive them from their auth provider. Engine string @@ -91,14 +92,15 @@ func registerDirect(name string) { engineName = "livekit" } sess, err := engine.New(ctx, engineName, engine.Config{ - URL: cfg.URL, - Token: cfg.Token, - Name: cfg.Name, - OnData: cfg.OnData, - OnPeerData: cfg.OnPeerData, - DNSServer: cfg.DNSServer, - ProxyAddr: cfg.ProxyAddr, - ProxyPort: cfg.ProxyPort, + URL: cfg.URL, + Token: cfg.Token, + Name: cfg.Name, + OnData: cfg.OnData, + OnPeerData: cfg.OnPeerData, + DNSServer: cfg.DNSServer, + ProxyAddr: cfg.ProxyAddr, + ProxyPort: cfg.ProxyPort, + RequireTargetedPeer: cfg.RequireTargetedPeer, }) if err != nil { return nil, fmt.Errorf("engine new: %w", err) @@ -123,15 +125,16 @@ func registerEngineAuth(name string, provider auth.Provider) { return nil, fmt.Errorf("%w: %w", ErrAuthFailed, err) } sess, err := engine.New(ctx, provider.Engine(), engine.Config{ - URL: creds.URL, - Token: creds.Token, - Name: cfg.Name, - Extra: creds.Extra, - OnData: cfg.OnData, - OnPeerData: cfg.OnPeerData, - DNSServer: cfg.DNSServer, - ProxyAddr: cfg.ProxyAddr, - ProxyPort: cfg.ProxyPort, + URL: creds.URL, + Token: creds.Token, + Name: cfg.Name, + Extra: creds.Extra, + OnData: cfg.OnData, + OnPeerData: cfg.OnPeerData, + DNSServer: cfg.DNSServer, + ProxyAddr: cfg.ProxyAddr, + ProxyPort: cfg.ProxyPort, + RequireTargetedPeer: cfg.RequireTargetedPeer, Refresh: func(ctx context.Context) (engine.Credentials, error) { fresh, err := provider.Issue(ctx, authCfg) if err != nil { diff --git a/internal/engine/engine.go b/internal/engine/engine.go index fd44581..699b4a5 100644 --- a/internal/engine/engine.go +++ b/internal/engine/engine.go @@ -57,7 +57,11 @@ type Config struct { DNSServer string ProxyAddr string ProxyPort int - Refresh func(ctx context.Context) (Credentials, error) + // RequireTargetedPeer asks engines that multiplex room-wide messages to + // ignore single-peer broadcast frames until the remote has addressed this + // session's local epoch. + RequireTargetedPeer bool + Refresh func(ctx context.Context) (Credentials, error) } // Session is the engine-level runtime handle. It is shaped to match what diff --git a/internal/engine/jitsi/jitsi.go b/internal/engine/jitsi/jitsi.go index 2db2b5e..df33f91 100644 --- a/internal/engine/jitsi/jitsi.go +++ b/internal/engine/jitsi/jitsi.go @@ -99,11 +99,12 @@ type Session struct { room string name string - onData func([]byte) - onPeerData func(peerID string, data []byte) - onReconnect func(*webrtc.DataChannel) - shouldReconnect func() bool - onEnded func(string) + onData func([]byte) + onPeerData func(peerID string, data []byte) + requireTargetedPeer bool + onReconnect func(*webrtc.DataChannel) + shouldReconnect func() bool + onEnded func(string) jSess atomic.Pointer[j.Session] @@ -185,18 +186,19 @@ func New(_ context.Context, cfg engine.Config) (engine.Session, error) { runCtx, cancel := context.WithCancel(context.Background()) s := &Session{ - host: host, - room: room, - name: name, - onData: cfg.OnData, - onPeerData: cfg.OnPeerData, - sendQueue: make(chan []byte, defaultSendQueueSize), - peerSendQueue: make(chan bridgeOutbound, defaultSendQueueSize), - peerEpochs: make(map[string]uint32), - reconnectCh: make(chan struct{}, 1), - done: make(chan struct{}), - cancel: cancel, - runCtx: runCtx, + host: host, + room: room, + name: name, + onData: cfg.OnData, + onPeerData: cfg.OnPeerData, + requireTargetedPeer: cfg.RequireTargetedPeer, + sendQueue: make(chan []byte, defaultSendQueueSize), + peerSendQueue: make(chan bridgeOutbound, defaultSendQueueSize), + peerEpochs: make(map[string]uint32), + reconnectCh: make(chan struct{}, 1), + done: make(chan struct{}), + cancel: cancel, + runCtx: runCtx, } s.localEpoch.Store(randomEpoch()) return s, nil @@ -1117,13 +1119,13 @@ func (s *Session) deliverBridgeMessage(msg j.BridgeMessage, ok bool) bool { if s.onPeerData != nil && msg.From != "" { return s.deliverPeerBridgePayload(msg.From, payload) } - if !s.peerLatchAccepts(msg.From) { - return true - } data, ok := s.acceptEpochFrame(payload) if !ok { return true } + if !s.peerLatchAccepts(msg.From) { + return true + } if len(data) == 0 { return true } @@ -1192,6 +1194,11 @@ func (s *Session) acceptEpochFrame(payload []byte) ([]byte, bool) { receiverEpoch, s.localEpoch.Load()) return nil, false } + if s.requireTargetedPeer && s.onPeerData == nil && receiverEpoch != s.localEpoch.Load() { + logger.Debugf("jitsi: drop untargeted bridge frame senderEpoch=0x%08x localEpoch=0x%08x", + senderEpoch, s.localEpoch.Load()) + return nil, false + } // Update the peer-epoch latch and ALWAYS accept the frame. // // Earlier revisions reconnected ourselves whenever the peer's epoch diff --git a/internal/engine/jitsi/jitsi_test.go b/internal/engine/jitsi/jitsi_test.go index 8b961c0..a22593e 100644 --- a/internal/engine/jitsi/jitsi_test.go +++ b/internal/engine/jitsi/jitsi_test.go @@ -320,6 +320,52 @@ func TestReconnectEpochAnnounceWithZeroPeerEpochIsAccepted(t *testing.T) { } } +func TestRequireTargetedPeerDropsOtherClientBroadcastBeforeLatch(t *testing.T) { + var received [][]byte + sess, err := New(context.Background(), engine.Config{ + URL: testHost, + Extra: map[string]string{credentialKeyRoom: testRoom}, + RequireTargetedPeer: true, + OnData: func(b []byte) { + received = append(received, append([]byte(nil), b...)) + }, + }) + if err != nil { + t.Fatalf("New: %v", err) + } + defer func() { _ = sess.Close() }() + + js, ok := sess.(*Session) + if !ok { + t.Fatal("sess is not *Session") + } + js.localEpoch.Store(0x3333) + + otherClient := makeBridgeFrameForEpoch(t, 0x2222, 0, []byte("CLIENT_HELLO")) + js.deliverBridgeMessage(makeBridgeMessageFrom("clientB", map[string]any{rawFieldKey: otherClient}), true) + if len(received) != 0 { + t.Fatalf("received other client broadcast = %q, want none", received) + } + if got := js.peerEpoch.Load(); got != 0 { + t.Fatalf("peerEpoch after other client broadcast = 0x%08x, want 0", got) + } + if got := js.peerEndpoint.Load(); got != nil { + t.Fatalf("peerEndpoint after other client broadcast = %q, want nil", *got) + } + + serverWelcome := makeBridgeFrameForEpoch(t, 0x1111, 0x3333, []byte("SERVER_WELCOME")) + js.deliverBridgeMessage(makeBridgeMessageFrom("server", map[string]any{rawFieldKey: serverWelcome}), true) + if len(received) != 1 || string(received[0]) != "SERVER_WELCOME" { + t.Fatalf("received = %q, want targeted server welcome", received) + } + if got := js.peerEpoch.Load(); got != 0x1111 { + t.Fatalf("peerEpoch = 0x%08x, want server epoch", got) + } + if got := js.peerEndpoint.Load(); got == nil || *got != "server" { + t.Fatalf("peerEndpoint = %v, want server", got) + } +} + // TestDeliverBridgeMessagePeerEpochChangeAcceptsFrameNoReconnect codifies // the post-fix behaviour: when a peer's epoch flips (because the peer // reconnected), we update our latch and ACCEPT the new frame instead of diff --git a/internal/transport/datachannel/transport.go b/internal/transport/datachannel/transport.go index eb5b1a8..13b7107 100644 --- a/internal/transport/datachannel/transport.go +++ b/internal/transport/datachannel/transport.go @@ -24,16 +24,17 @@ type streamTransport struct { // New creates a datachannel transport backed by a carrier engine. func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) { sess, err := enginebuiltin.Open(ctx, cfg.Carrier, enginebuiltin.Config{ - RoomURL: cfg.RoomURL, - Name: cfg.Name, - OnData: cfg.OnData, - OnPeerData: cfg.OnPeerData, - DNSServer: cfg.DNSServer, - ProxyAddr: cfg.ProxyAddr, - ProxyPort: cfg.ProxyPort, - Engine: cfg.Engine, - URL: cfg.URL, - Token: cfg.Token, + RoomURL: cfg.RoomURL, + Name: cfg.Name, + OnData: cfg.OnData, + OnPeerData: cfg.OnPeerData, + DNSServer: cfg.DNSServer, + ProxyAddr: cfg.ProxyAddr, + ProxyPort: cfg.ProxyPort, + Engine: cfg.Engine, + URL: cfg.URL, + Token: cfg.Token, + RequireTargetedPeer: cfg.RequireTargetedPeer, }) if err != nil { return nil, fmt.Errorf("open engine session: %w", err) diff --git a/internal/transport/transport.go b/internal/transport/transport.go index be2af30..61c267e 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -88,6 +88,12 @@ type Config struct { ProxyAddr string ProxyPort int + // RequireTargetedPeer makes single-peer engines ignore broadcast frames + // from unrelated olcrtc clients until a peer sends a frame addressed to + // this session's local epoch. Server-side transports leave this disabled + // so they can accept initial broadcast CLIENT_HELLO frames. + RequireTargetedPeer bool + // Options carries transport-specific tuning. Type is per-transport-package. Options Options