mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-05-26 07:08:11 +00:00
fix(salutejazz): bound session close on wedged pc shutdown
This commit is contained in:
@@ -134,3 +134,28 @@ func TestShutdownWebSocketIsIdempotent(t *testing.T) {
|
||||
go func() { defer wg.Done(); s.shutdownWebSocket() }()
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
// TestCloseWithDeadlineDoesNotBlockOnStraggler pins down that a wedged
|
||||
// PeerConnection.Close (modeled here as a never-returning closer) does not
|
||||
// hold up Session.Close past its budget. The historical failure mode showed
|
||||
// up in the real e2e matrix as "tunnel goroutine did not stop: client" when
|
||||
// pion's TURN refresh storm kept the ICE agent alive long after the test
|
||||
// asked it to shut down.
|
||||
func TestCloseWithDeadlineDoesNotBlockOnStraggler(t *testing.T) {
|
||||
deadline := 50 * time.Millisecond
|
||||
closers := []func() error{
|
||||
func() error { return nil },
|
||||
func() error { select {} }, //nolint:revive // intentional block to model a wedged pion close
|
||||
}
|
||||
|
||||
start := time.Now()
|
||||
closeWithDeadline(closers, deadline)
|
||||
elapsed := time.Since(start)
|
||||
|
||||
if elapsed > deadline*4 {
|
||||
t.Fatalf("closeWithDeadline blocked for %s, expected ~%s", elapsed, deadline)
|
||||
}
|
||||
if elapsed < deadline {
|
||||
t.Fatalf("closeWithDeadline returned in %s before deadline %s; should have waited for the straggler", elapsed, deadline)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -57,6 +57,7 @@ const (
|
||||
wsHandshakeTimeout = 15 * time.Second
|
||||
sendQueueTimeout = 50 * time.Millisecond
|
||||
closeWaitTimeout = 2 * time.Second
|
||||
pcCloseTimeout = 3 * time.Second
|
||||
subscriberOfferGap = 300 * time.Millisecond
|
||||
audioFrameDuration = 20 * time.Millisecond
|
||||
)
|
||||
@@ -1017,6 +1018,14 @@ func (s *Session) processSendQueue() {
|
||||
// past the deadline). The data channel and peer connections are torn down
|
||||
// after the WS so that any final ICE / signaling cleanup the goroutines do
|
||||
// on their way out still has somewhere to write.
|
||||
//
|
||||
// pion's PeerConnection.Close blocks until the ICE agent and its TURN
|
||||
// allocations drain; on the jazz e2e runner most relays get rejected with
|
||||
// "403: Forbidden IP" and the agent keeps logging "Failed to handle
|
||||
// message: the agent is closed" every 2s while it churns through them. We
|
||||
// fire dc/pc closes in parallel and cap them with pcCloseTimeout so a
|
||||
// stuck pion goroutine never holds up the carrier link teardown past the
|
||||
// e2e harness's 20s budget.
|
||||
func (s *Session) Close() error {
|
||||
s.closed.Store(true)
|
||||
s.sendQueueClosed.Store(true)
|
||||
@@ -1035,18 +1044,48 @@ func (s *Session) Close() error {
|
||||
case <-time.After(closeWaitTimeout):
|
||||
}
|
||||
|
||||
closers := make([]func() error, 0, 3)
|
||||
if s.dc != nil {
|
||||
_ = s.dc.Close()
|
||||
closers = append(closers, s.dc.Close)
|
||||
}
|
||||
if s.pcPub != nil {
|
||||
_ = s.pcPub.Close()
|
||||
closers = append(closers, s.pcPub.Close)
|
||||
}
|
||||
if s.pcSub != nil {
|
||||
_ = s.pcSub.Close()
|
||||
closers = append(closers, s.pcSub.Close)
|
||||
}
|
||||
closeWithDeadline(closers, pcCloseTimeout)
|
||||
return nil
|
||||
}
|
||||
|
||||
// closeWithDeadline runs the supplied Close funcs concurrently and returns
|
||||
// once all of them have returned OR the deadline elapses, whichever comes
|
||||
// first. Stragglers (typically a pion PeerConnection.Close waiting on a
|
||||
// wedged TURN allocation) are left to finish in the background so they
|
||||
// don't block carrier-link teardown.
|
||||
func closeWithDeadline(closers []func() error, timeout time.Duration) {
|
||||
if len(closers) == 0 {
|
||||
return
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(closers))
|
||||
for _, fn := range closers {
|
||||
go func(fn func() error) {
|
||||
defer wg.Done()
|
||||
_ = fn()
|
||||
}(fn)
|
||||
}
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
close(done)
|
||||
}()
|
||||
select {
|
||||
case <-done:
|
||||
case <-time.After(timeout):
|
||||
}
|
||||
}
|
||||
|
||||
// shutdownWebSocket politely closes the connector WebSocket and trips its
|
||||
// read deadline to the past so any blocked ReadJSON in handleSignaling
|
||||
// returns immediately. The conn pointer is left intact on purpose: writers
|
||||
|
||||
Reference in New Issue
Block a user