diff --git a/internal/engine/jitsi/helpers_test.go b/internal/engine/jitsi/helpers_test.go index c5ee6b1..e908d10 100644 --- a/internal/engine/jitsi/helpers_test.go +++ b/internal/engine/jitsi/helpers_test.go @@ -19,9 +19,9 @@ func makeBridgeMessage(class string, fields map[string]any) j.BridgeMessage { } } -func makeBridgeMessageFrom(class, from string, fields map[string]any) j.BridgeMessage { +func makeBridgeMessageFrom(from string, fields map[string]any) j.BridgeMessage { return j.BridgeMessage{ - Class: class, + Class: "EndpointMessage", From: from, Fields: fields, } diff --git a/internal/engine/jitsi/jitsi.go b/internal/engine/jitsi/jitsi.go index bdb10f0..e4d77d1 100644 --- a/internal/engine/jitsi/jitsi.go +++ b/internal/engine/jitsi/jitsi.go @@ -654,26 +654,31 @@ func (s *Session) deliverBridgeMessage(msg j.BridgeMessage, ok bool) bool { if len(payload) < len(bridgeMagic) || !bytes.Equal(payload[:len(bridgeMagic)], bridgeMagic[:]) { return true } - // peer-latch: the first sender whose payload survived the magic check - // becomes our partner; everyone else is ignored. Cleared on reconnect by - // the supervisor (peerEndpoint is reset whenever the bridge is reopened). - if cur := s.peerEndpoint.Load(); cur != nil { - if *cur != msg.From { - return true - } - } else if msg.From != "" { - from := msg.From - s.peerEndpoint.CompareAndSwap(nil, &from) - // Re-check after CAS: a concurrent latch may have picked a different - // peer first; if so, drop this frame. - if cur := s.peerEndpoint.Load(); cur != nil && *cur != msg.From { - return true - } + if !s.peerLatchAccepts(msg.From) { + return true } s.onData(payload[len(bridgeMagic):]) return true } +// peerLatchAccepts implements the peer-latch logic: the first sender whose +// payload survived the magic check becomes our partner; everyone else is +// ignored. Cleared on reconnect by the supervisor (peerEndpoint is reset +// whenever the bridge is reopened). +func (s *Session) peerLatchAccepts(from string) bool { + if cur := s.peerEndpoint.Load(); cur != nil { + return *cur == from + } + if from == "" { + return true + } + s.peerEndpoint.CompareAndSwap(nil, &from) + // Re-check after CAS: a concurrent latch may have picked a different + // peer first; if so, drop this frame. + cur := s.peerEndpoint.Load() + return cur == nil || *cur == from +} + // decodeRaw extracts the bytes from an EndpointMessage produced by the j // library's BridgeSendRaw helper. Mirrors the unexported colibri.DecodeRaw — // the j library's BridgeMessage type alias keeps the necessary fields public, diff --git a/internal/engine/jitsi/jitsi_test.go b/internal/engine/jitsi/jitsi_test.go index 4a43049..219b87c 100644 --- a/internal/engine/jitsi/jitsi_test.go +++ b/internal/engine/jitsi/jitsi_test.go @@ -154,7 +154,10 @@ func TestDeliverBridgeMessageMagicAndPeerLatch(t *testing.T) { } defer func() { _ = sess.Close() }() - js := sess.(*Session) + js, ok := sess.(*Session) + if !ok { + t.Fatal("sess is not *Session") + } var received [][]byte js.onData = func(b []byte) { received = append(received, append([]byte(nil), b...)) @@ -164,16 +167,16 @@ func TestDeliverBridgeMessageMagicAndPeerLatch(t *testing.T) { bad := encodeForTest(t, []byte("alpha")) // no magic prefix // First valid frame from peerA latches the peer and is delivered. - if !js.deliverBridgeMessage(makeBridgeMessageFrom(classEndpoint, "peerA", map[string]any{rawFieldKey: good}), true) { + if !js.deliverBridgeMessage(makeBridgeMessageFrom("peerA", map[string]any{rawFieldKey: good}), true) { t.Fatal("deliverBridgeMessage returned false on valid frame") } // Frame without magic is dropped. - js.deliverBridgeMessage(makeBridgeMessageFrom(classEndpoint, "peerA", map[string]any{rawFieldKey: bad}), true) + js.deliverBridgeMessage(makeBridgeMessageFrom("peerA", map[string]any{rawFieldKey: bad}), true) // Frame from a different sender after latch is dropped even with magic. - js.deliverBridgeMessage(makeBridgeMessageFrom(classEndpoint, "peerB", map[string]any{rawFieldKey: good}), true) + js.deliverBridgeMessage(makeBridgeMessageFrom("peerB", map[string]any{rawFieldKey: good}), true) // Another frame from latched peer still flows. beta := makeBridgeFrame(t, []byte("beta")) - js.deliverBridgeMessage(makeBridgeMessageFrom(classEndpoint, "peerA", map[string]any{rawFieldKey: beta}), true) + js.deliverBridgeMessage(makeBridgeMessageFrom("peerA", map[string]any{rawFieldKey: beta}), true) if len(received) != 2 { t.Fatalf("received frames = %d, want 2 (%q)", len(received), received)