mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-05-30 17:09:43 +00:00
feat(jitsi): add automatic bridge reconnection
This commit is contained in:
@@ -45,6 +45,8 @@ const (
|
||||
defaultNick = "olcrtc"
|
||||
credentialKeyRoom = "room"
|
||||
videoTrackName = "videochannel"
|
||||
maxReconnects = 5
|
||||
reconnectWindow = 5 * time.Minute
|
||||
)
|
||||
|
||||
// bridgeMagic tags every EndpointMessage produced by this engine. JVB broadcasts
|
||||
@@ -87,20 +89,25 @@ type Session struct {
|
||||
pcMu sync.Mutex
|
||||
pc *webrtc.PeerConnection
|
||||
|
||||
sendQueue chan []byte
|
||||
bridgeReady atomic.Bool
|
||||
closed atomic.Bool
|
||||
sendQueue chan []byte
|
||||
bridgeReady atomic.Bool
|
||||
closed atomic.Bool
|
||||
reconnecting atomic.Bool
|
||||
|
||||
reconnectCh chan struct{}
|
||||
lastReconnect time.Time
|
||||
reconnectCount int
|
||||
|
||||
// peerEndpoint latches the MUC nick of the first occupant whose
|
||||
// EndpointMessage passed the bridgeMagic check. Once set, all bridge
|
||||
// messages from other senders are dropped, isolating us from chatter by
|
||||
// unrelated olcrtc processes that happen to share the same room.
|
||||
peerEndpoint atomic.Pointer[string]
|
||||
done chan struct{}
|
||||
doneOnce sync.Once
|
||||
cancel context.CancelFunc
|
||||
runCtx context.Context //nolint:containedctx // engine owns the supervisor lifetime
|
||||
wg sync.WaitGroup
|
||||
done chan struct{}
|
||||
doneOnce sync.Once
|
||||
cancel context.CancelFunc
|
||||
runCtx context.Context //nolint:containedctx // engine owns the supervisor lifetime
|
||||
wg sync.WaitGroup
|
||||
|
||||
videoTrackMu sync.RWMutex
|
||||
videoTracks []webrtc.TrackLocal
|
||||
@@ -138,14 +145,15 @@ func New(_ context.Context, cfg engine.Config) (engine.Session, error) {
|
||||
|
||||
runCtx, cancel := context.WithCancel(context.Background())
|
||||
return &Session{
|
||||
host: host,
|
||||
room: room,
|
||||
name: name,
|
||||
onData: cfg.OnData,
|
||||
sendQueue: make(chan []byte, defaultSendQueueSize),
|
||||
done: make(chan struct{}),
|
||||
cancel: cancel,
|
||||
runCtx: runCtx,
|
||||
host: host,
|
||||
room: room,
|
||||
name: name,
|
||||
onData: cfg.OnData,
|
||||
sendQueue: make(chan []byte, defaultSendQueueSize),
|
||||
reconnectCh: make(chan struct{}, 1),
|
||||
done: make(chan struct{}),
|
||||
cancel: cancel,
|
||||
runCtx: runCtx,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -233,6 +241,19 @@ func (s *Session) Connect(ctx context.Context) error {
|
||||
return ErrSessionClosed
|
||||
}
|
||||
|
||||
jSess, err := s.joinAndOpenBridge(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.jSess.Store(jSess)
|
||||
|
||||
s.wg.Add(2)
|
||||
go s.sendLoop()
|
||||
go s.recvLoop()
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Session) joinAndOpenBridge(ctx context.Context) (*j.Session, error) {
|
||||
logger.Infof("jitsi: joining %s/%s as %s …", s.host, s.room, s.name)
|
||||
jSess, err := j.Join(ctx, j.Config{
|
||||
Host: s.host,
|
||||
@@ -241,17 +262,17 @@ func (s *Session) Connect(ctx context.Context) error {
|
||||
Debug: logger.IsVerbose(),
|
||||
})
|
||||
if err != nil {
|
||||
return fmt.Errorf("jitsi join: %w", err)
|
||||
return nil, fmt.Errorf("jitsi join: %w", err)
|
||||
}
|
||||
logger.Infof("jitsi: joined %s/%s; colibri-ws=%s", s.host, s.room, jSess.ColibriWS)
|
||||
s.jSess.Store(jSess)
|
||||
|
||||
if s.onData != nil {
|
||||
bctx, bcancel := context.WithTimeout(ctx, bridgeOpenTimeout)
|
||||
err := jSess.OpenBridge(bctx)
|
||||
bcancel()
|
||||
if err != nil {
|
||||
return fmt.Errorf("open bridge: %w", err)
|
||||
_ = jSess.Close()
|
||||
return nil, fmt.Errorf("open bridge: %w", err)
|
||||
}
|
||||
// Re-latch peer on every bridge open: after a reconnect the partner's
|
||||
// MUC nick may have changed.
|
||||
@@ -263,14 +284,12 @@ func (s *Session) Connect(ctx context.Context) error {
|
||||
|
||||
if s.shouldNegotiatePC() {
|
||||
if err := s.negotiatePC(ctx, jSess); err != nil {
|
||||
return err
|
||||
_ = jSess.Close()
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
s.wg.Add(2)
|
||||
go s.sendLoop()
|
||||
go s.recvLoop()
|
||||
return nil
|
||||
return jSess, nil
|
||||
}
|
||||
|
||||
func (s *Session) shouldNegotiatePC() bool {
|
||||
@@ -477,7 +496,6 @@ func (s *Session) trickleDrainLoop(pc *webrtc.PeerConnection, neg negotiator, st
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// xmlCandidate is a minimal XML representation of a Jingle ICE candidate.
|
||||
type xmlCandidate struct {
|
||||
Component string `xml:"component,attr"`
|
||||
@@ -495,8 +513,8 @@ type xmlCandidate struct {
|
||||
// xmlTransportInfo is the minimal structure needed to extract candidates
|
||||
// from a <jingle action="transport-info"> stanza.
|
||||
type xmlTransportInfo struct {
|
||||
XMLName xml.Name `xml:"iq"`
|
||||
Jingle struct {
|
||||
XMLName xml.Name `xml:"iq"`
|
||||
Jingle struct {
|
||||
Action string `xml:"action,attr"`
|
||||
Contents []struct {
|
||||
Name string `xml:"name,attr"`
|
||||
@@ -643,7 +661,7 @@ func (s *Session) recvLoop() {
|
||||
func (s *Session) deliverBridgeMessage(msg j.BridgeMessage, ok bool) bool {
|
||||
if !ok {
|
||||
if !s.closed.Load() {
|
||||
s.signalEnded("jitsi bridge closed")
|
||||
s.requestReconnect("jitsi bridge closed")
|
||||
}
|
||||
return false
|
||||
}
|
||||
@@ -769,26 +787,126 @@ func (s *Session) Close() error {
|
||||
}
|
||||
|
||||
// SetReconnectCallback registers a callback for reconnection events.
|
||||
//
|
||||
// The Jitsi engine itself does not currently drive a reconnect loop; the
|
||||
// callback is stored for API parity and wired through the carrier adapter
|
||||
// for future use.
|
||||
func (s *Session) SetReconnectCallback(cb func(*webrtc.DataChannel)) { s.onReconnect = cb }
|
||||
|
||||
// SetShouldReconnect stores the reconnect predicate (kept for API parity).
|
||||
// SetShouldReconnect stores the reconnect predicate.
|
||||
func (s *Session) SetShouldReconnect(fn func() bool) { s.shouldReconnect = fn }
|
||||
|
||||
// SetEndedCallback registers a function to call when the session ends.
|
||||
func (s *Session) SetEndedCallback(cb func(string)) { s.onEnded = cb }
|
||||
|
||||
// WatchConnection blocks until the session is closed, the parent context
|
||||
// fires, or the bridge tears down.
|
||||
// WatchConnection monitors bridge lifecycle and reconnects when JVB closes
|
||||
// the endpoint's colibri-ws without ending the XMPP conference.
|
||||
func (s *Session) WatchConnection(ctx context.Context) {
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-s.done:
|
||||
return
|
||||
case <-s.reconnectCh:
|
||||
if s.handleReconnectAttempt(ctx) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Session) requestReconnect(reason string) {
|
||||
s.bridgeReady.Store(false)
|
||||
if s.closed.Load() || s.reconnecting.Load() {
|
||||
return
|
||||
}
|
||||
if s.shouldReconnect != nil && !s.shouldReconnect() {
|
||||
s.signalEnded(reason)
|
||||
return
|
||||
}
|
||||
logger.Infof("jitsi reconnect requested: %s", reason)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
case <-s.done:
|
||||
return
|
||||
case s.reconnectCh <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Session) handleReconnectAttempt(ctx context.Context) bool {
|
||||
if time.Since(s.lastReconnect) > reconnectWindow {
|
||||
s.reconnectCount = 0
|
||||
}
|
||||
s.reconnectCount++
|
||||
s.lastReconnect = time.Now()
|
||||
|
||||
if s.reconnectCount > maxReconnects {
|
||||
s.signalEnded("jitsi reconnect limit reached")
|
||||
return true
|
||||
}
|
||||
|
||||
backoff := time.Duration(s.reconnectCount) * 2 * time.Second
|
||||
if backoff > 30*time.Second {
|
||||
backoff = 30 * time.Second
|
||||
}
|
||||
|
||||
for {
|
||||
if err := s.reconnect(ctx); err != nil {
|
||||
logger.Warnf("jitsi reconnect failed: %v", err)
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return true
|
||||
case <-s.done:
|
||||
return true
|
||||
case <-time.After(backoff):
|
||||
continue
|
||||
}
|
||||
}
|
||||
s.drainReconnectQueue()
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
func (s *Session) reconnect(ctx context.Context) error {
|
||||
if !s.reconnecting.CompareAndSwap(false, true) {
|
||||
return nil
|
||||
}
|
||||
defer s.reconnecting.Store(false)
|
||||
|
||||
s.bridgeReady.Store(false)
|
||||
if old := s.jSess.Swap(nil); old != nil {
|
||||
_ = old.Close()
|
||||
}
|
||||
s.pcMu.Lock()
|
||||
oldPC := s.pc
|
||||
s.pc = nil
|
||||
s.pcMu.Unlock()
|
||||
if oldPC != nil {
|
||||
_ = oldPC.Close()
|
||||
}
|
||||
|
||||
logger.Infof("jitsi: reconnecting %s/%s as %s ...", s.host, s.room, s.name)
|
||||
jSess, err := s.joinAndOpenBridge(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.jSess.Store(jSess)
|
||||
s.peerEndpoint.Store(nil)
|
||||
s.peerVideoSSRC.Store(0)
|
||||
s.bridgeReady.Store(true)
|
||||
|
||||
s.wg.Add(1)
|
||||
go s.recvLoop()
|
||||
|
||||
if s.onReconnect != nil {
|
||||
s.onReconnect(nil)
|
||||
}
|
||||
logger.Infof("jitsi: reconnected %s/%s; colibri-ws=%s", s.host, s.room, jSess.ColibriWS)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Session) drainReconnectQueue() {
|
||||
for {
|
||||
select {
|
||||
case <-s.reconnectCh:
|
||||
default:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -4,8 +4,10 @@ import (
|
||||
"context"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/openlibrecommunity/olcrtc/internal/engine"
|
||||
"github.com/zarazaex69/j"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -186,6 +188,57 @@ func TestDeliverBridgeMessageMagicAndPeerLatch(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
func TestBridgeCloseRequestsReconnect(t *testing.T) {
|
||||
sess, err := New(context.Background(), engine.Config{
|
||||
URL: testHost,
|
||||
Extra: map[string]string{credentialKeyRoom: testRoom},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("New: %v", err)
|
||||
}
|
||||
defer func() { _ = sess.Close() }()
|
||||
|
||||
js := sess.(*Session)
|
||||
var ended string
|
||||
js.SetEndedCallback(func(reason string) { ended = reason })
|
||||
js.SetShouldReconnect(func() bool { return true })
|
||||
|
||||
if js.deliverBridgeMessage(j.BridgeMessage{}, false) {
|
||||
t.Fatal("deliverBridgeMessage returned true on closed bridge")
|
||||
}
|
||||
select {
|
||||
case <-js.reconnectCh:
|
||||
case <-time.After(time.Second):
|
||||
t.Fatal("bridge close did not request reconnect")
|
||||
}
|
||||
if ended != "" {
|
||||
t.Fatalf("ended = %q, want empty", ended)
|
||||
}
|
||||
}
|
||||
|
||||
func TestBridgeCloseEndsWhenReconnectDisabled(t *testing.T) {
|
||||
sess, err := New(context.Background(), engine.Config{
|
||||
URL: testHost,
|
||||
Extra: map[string]string{credentialKeyRoom: testRoom},
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatalf("New: %v", err)
|
||||
}
|
||||
defer func() { _ = sess.Close() }()
|
||||
|
||||
js := sess.(*Session)
|
||||
var ended string
|
||||
js.SetEndedCallback(func(reason string) { ended = reason })
|
||||
js.SetShouldReconnect(func() bool { return false })
|
||||
|
||||
if js.deliverBridgeMessage(j.BridgeMessage{}, false) {
|
||||
t.Fatal("deliverBridgeMessage returned true on closed bridge")
|
||||
}
|
||||
if ended != "jitsi bridge closed" {
|
||||
t.Fatalf("ended = %q, want bridge close reason", ended)
|
||||
}
|
||||
}
|
||||
|
||||
func TestEngineRegistration(t *testing.T) {
|
||||
if _, err := engine.New(context.Background(), "jitsi", engine.Config{
|
||||
URL: testHost,
|
||||
|
||||
Reference in New Issue
Block a user