diff --git a/internal/engine/jitsi/jitsi.go b/internal/engine/jitsi/jitsi.go index e4d77d1..8c99692 100644 --- a/internal/engine/jitsi/jitsi.go +++ b/internal/engine/jitsi/jitsi.go @@ -45,6 +45,8 @@ const ( defaultNick = "olcrtc" credentialKeyRoom = "room" videoTrackName = "videochannel" + maxReconnects = 5 + reconnectWindow = 5 * time.Minute ) // bridgeMagic tags every EndpointMessage produced by this engine. JVB broadcasts @@ -87,20 +89,25 @@ type Session struct { pcMu sync.Mutex pc *webrtc.PeerConnection - sendQueue chan []byte - bridgeReady atomic.Bool - closed atomic.Bool + sendQueue chan []byte + bridgeReady atomic.Bool + closed atomic.Bool + reconnecting atomic.Bool + + reconnectCh chan struct{} + lastReconnect time.Time + reconnectCount int // peerEndpoint latches the MUC nick of the first occupant whose // EndpointMessage passed the bridgeMagic check. Once set, all bridge // messages from other senders are dropped, isolating us from chatter by // unrelated olcrtc processes that happen to share the same room. peerEndpoint atomic.Pointer[string] - done chan struct{} - doneOnce sync.Once - cancel context.CancelFunc - runCtx context.Context //nolint:containedctx // engine owns the supervisor lifetime - wg sync.WaitGroup + done chan struct{} + doneOnce sync.Once + cancel context.CancelFunc + runCtx context.Context //nolint:containedctx // engine owns the supervisor lifetime + wg sync.WaitGroup videoTrackMu sync.RWMutex videoTracks []webrtc.TrackLocal @@ -138,14 +145,15 @@ func New(_ context.Context, cfg engine.Config) (engine.Session, error) { runCtx, cancel := context.WithCancel(context.Background()) return &Session{ - host: host, - room: room, - name: name, - onData: cfg.OnData, - sendQueue: make(chan []byte, defaultSendQueueSize), - done: make(chan struct{}), - cancel: cancel, - runCtx: runCtx, + host: host, + room: room, + name: name, + onData: cfg.OnData, + sendQueue: make(chan []byte, defaultSendQueueSize), + reconnectCh: make(chan struct{}, 1), + done: make(chan struct{}), + cancel: cancel, + runCtx: runCtx, }, nil } @@ -233,6 +241,19 @@ func (s *Session) Connect(ctx context.Context) error { return ErrSessionClosed } + jSess, err := s.joinAndOpenBridge(ctx) + if err != nil { + return err + } + s.jSess.Store(jSess) + + s.wg.Add(2) + go s.sendLoop() + go s.recvLoop() + return nil +} + +func (s *Session) joinAndOpenBridge(ctx context.Context) (*j.Session, error) { logger.Infof("jitsi: joining %s/%s as %s …", s.host, s.room, s.name) jSess, err := j.Join(ctx, j.Config{ Host: s.host, @@ -241,17 +262,17 @@ func (s *Session) Connect(ctx context.Context) error { Debug: logger.IsVerbose(), }) if err != nil { - return fmt.Errorf("jitsi join: %w", err) + return nil, fmt.Errorf("jitsi join: %w", err) } logger.Infof("jitsi: joined %s/%s; colibri-ws=%s", s.host, s.room, jSess.ColibriWS) - s.jSess.Store(jSess) if s.onData != nil { bctx, bcancel := context.WithTimeout(ctx, bridgeOpenTimeout) err := jSess.OpenBridge(bctx) bcancel() if err != nil { - return fmt.Errorf("open bridge: %w", err) + _ = jSess.Close() + return nil, fmt.Errorf("open bridge: %w", err) } // Re-latch peer on every bridge open: after a reconnect the partner's // MUC nick may have changed. @@ -263,14 +284,12 @@ func (s *Session) Connect(ctx context.Context) error { if s.shouldNegotiatePC() { if err := s.negotiatePC(ctx, jSess); err != nil { - return err + _ = jSess.Close() + return nil, err } } - s.wg.Add(2) - go s.sendLoop() - go s.recvLoop() - return nil + return jSess, nil } func (s *Session) shouldNegotiatePC() bool { @@ -477,7 +496,6 @@ func (s *Session) trickleDrainLoop(pc *webrtc.PeerConnection, neg negotiator, st } } - // xmlCandidate is a minimal XML representation of a Jingle ICE candidate. type xmlCandidate struct { Component string `xml:"component,attr"` @@ -495,8 +513,8 @@ type xmlCandidate struct { // xmlTransportInfo is the minimal structure needed to extract candidates // from a stanza. type xmlTransportInfo struct { - XMLName xml.Name `xml:"iq"` - Jingle struct { + XMLName xml.Name `xml:"iq"` + Jingle struct { Action string `xml:"action,attr"` Contents []struct { Name string `xml:"name,attr"` @@ -643,7 +661,7 @@ func (s *Session) recvLoop() { func (s *Session) deliverBridgeMessage(msg j.BridgeMessage, ok bool) bool { if !ok { if !s.closed.Load() { - s.signalEnded("jitsi bridge closed") + s.requestReconnect("jitsi bridge closed") } return false } @@ -769,26 +787,126 @@ func (s *Session) Close() error { } // SetReconnectCallback registers a callback for reconnection events. -// -// The Jitsi engine itself does not currently drive a reconnect loop; the -// callback is stored for API parity and wired through the carrier adapter -// for future use. func (s *Session) SetReconnectCallback(cb func(*webrtc.DataChannel)) { s.onReconnect = cb } -// SetShouldReconnect stores the reconnect predicate (kept for API parity). +// SetShouldReconnect stores the reconnect predicate. func (s *Session) SetShouldReconnect(fn func() bool) { s.shouldReconnect = fn } // SetEndedCallback registers a function to call when the session ends. func (s *Session) SetEndedCallback(cb func(string)) { s.onEnded = cb } -// WatchConnection blocks until the session is closed, the parent context -// fires, or the bridge tears down. +// WatchConnection monitors bridge lifecycle and reconnects when JVB closes +// the endpoint's colibri-ws without ending the XMPP conference. func (s *Session) WatchConnection(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-s.done: + return + case <-s.reconnectCh: + if s.handleReconnectAttempt(ctx) { + return + } + } + } +} + +func (s *Session) requestReconnect(reason string) { + s.bridgeReady.Store(false) + if s.closed.Load() || s.reconnecting.Load() { + return + } + if s.shouldReconnect != nil && !s.shouldReconnect() { + s.signalEnded(reason) + return + } + logger.Infof("jitsi reconnect requested: %s", reason) select { - case <-ctx.Done(): - return - case <-s.done: - return + case s.reconnectCh <- struct{}{}: + default: + } +} + +func (s *Session) handleReconnectAttempt(ctx context.Context) bool { + if time.Since(s.lastReconnect) > reconnectWindow { + s.reconnectCount = 0 + } + s.reconnectCount++ + s.lastReconnect = time.Now() + + if s.reconnectCount > maxReconnects { + s.signalEnded("jitsi reconnect limit reached") + return true + } + + backoff := time.Duration(s.reconnectCount) * 2 * time.Second + if backoff > 30*time.Second { + backoff = 30 * time.Second + } + + for { + if err := s.reconnect(ctx); err != nil { + logger.Warnf("jitsi reconnect failed: %v", err) + select { + case <-ctx.Done(): + return true + case <-s.done: + return true + case <-time.After(backoff): + continue + } + } + s.drainReconnectQueue() + return false + } +} + +func (s *Session) reconnect(ctx context.Context) error { + if !s.reconnecting.CompareAndSwap(false, true) { + return nil + } + defer s.reconnecting.Store(false) + + s.bridgeReady.Store(false) + if old := s.jSess.Swap(nil); old != nil { + _ = old.Close() + } + s.pcMu.Lock() + oldPC := s.pc + s.pc = nil + s.pcMu.Unlock() + if oldPC != nil { + _ = oldPC.Close() + } + + logger.Infof("jitsi: reconnecting %s/%s as %s ...", s.host, s.room, s.name) + jSess, err := s.joinAndOpenBridge(ctx) + if err != nil { + return err + } + s.jSess.Store(jSess) + s.peerEndpoint.Store(nil) + s.peerVideoSSRC.Store(0) + s.bridgeReady.Store(true) + + s.wg.Add(1) + go s.recvLoop() + + if s.onReconnect != nil { + s.onReconnect(nil) + } + logger.Infof("jitsi: reconnected %s/%s; colibri-ws=%s", s.host, s.room, jSess.ColibriWS) + return nil +} + +func (s *Session) drainReconnectQueue() { + for { + select { + case <-s.reconnectCh: + default: + return + } } } diff --git a/internal/engine/jitsi/jitsi_test.go b/internal/engine/jitsi/jitsi_test.go index 219b87c..c00cd84 100644 --- a/internal/engine/jitsi/jitsi_test.go +++ b/internal/engine/jitsi/jitsi_test.go @@ -4,8 +4,10 @@ import ( "context" "errors" "testing" + "time" "github.com/openlibrecommunity/olcrtc/internal/engine" + "github.com/zarazaex69/j" ) const ( @@ -186,6 +188,57 @@ func TestDeliverBridgeMessageMagicAndPeerLatch(t *testing.T) { } } +func TestBridgeCloseRequestsReconnect(t *testing.T) { + sess, err := New(context.Background(), engine.Config{ + URL: testHost, + Extra: map[string]string{credentialKeyRoom: testRoom}, + }) + if err != nil { + t.Fatalf("New: %v", err) + } + defer func() { _ = sess.Close() }() + + js := sess.(*Session) + var ended string + js.SetEndedCallback(func(reason string) { ended = reason }) + js.SetShouldReconnect(func() bool { return true }) + + if js.deliverBridgeMessage(j.BridgeMessage{}, false) { + t.Fatal("deliverBridgeMessage returned true on closed bridge") + } + select { + case <-js.reconnectCh: + case <-time.After(time.Second): + t.Fatal("bridge close did not request reconnect") + } + if ended != "" { + t.Fatalf("ended = %q, want empty", ended) + } +} + +func TestBridgeCloseEndsWhenReconnectDisabled(t *testing.T) { + sess, err := New(context.Background(), engine.Config{ + URL: testHost, + Extra: map[string]string{credentialKeyRoom: testRoom}, + }) + if err != nil { + t.Fatalf("New: %v", err) + } + defer func() { _ = sess.Close() }() + + js := sess.(*Session) + var ended string + js.SetEndedCallback(func(reason string) { ended = reason }) + js.SetShouldReconnect(func() bool { return false }) + + if js.deliverBridgeMessage(j.BridgeMessage{}, false) { + t.Fatal("deliverBridgeMessage returned true on closed bridge") + } + if ended != "jitsi bridge closed" { + t.Fatalf("ended = %q, want bridge close reason", ended) + } +} + func TestEngineRegistration(t *testing.T) { if _, err := engine.New(context.Background(), "jitsi", engine.Config{ URL: testHost,