mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-06-08 05:14:42 +00:00
fix(server,client): close old muxconn before session swap
This commit is contained in:
@@ -335,6 +335,16 @@ func (c *Client) handleReconnect(ctx context.Context, cfg Config, cancel context
|
||||
logger.Infof("client reconnect reason=%s - tearing down smux session", reason)
|
||||
c.resetLinkPeer()
|
||||
|
||||
// Close the old muxconn immediately so any in-flight Push from data
|
||||
// arriving on the new bridge is discarded. Without this, the server
|
||||
// side that reconnected faster can push frames into our old muxconn,
|
||||
// corrupting the dying smux session.
|
||||
c.sessMu.RLock()
|
||||
if c.conn != nil {
|
||||
_ = c.conn.Close()
|
||||
}
|
||||
c.sessMu.RUnlock()
|
||||
|
||||
// Install a fresh muxconn immediately so onData never hits nil while
|
||||
// the old session is being torn down. tryReopenSession will swap it
|
||||
// again with its own conn on each attempt.
|
||||
@@ -344,7 +354,6 @@ func (c *Client) handleReconnect(ctx context.Context, cfg Config, cancel context
|
||||
oldControl := c.controlStrm
|
||||
oldControlStop := c.controlStop
|
||||
oldSess := c.session
|
||||
oldConn := c.conn
|
||||
c.conn = newConn
|
||||
c.session = nil
|
||||
c.controlStrm = nil
|
||||
@@ -358,9 +367,6 @@ func (c *Client) handleReconnect(ctx context.Context, cfg Config, cancel context
|
||||
if oldSess != nil {
|
||||
_ = oldSess.Close()
|
||||
}
|
||||
if oldConn != nil {
|
||||
_ = oldConn.Close()
|
||||
}
|
||||
if oldControl != nil {
|
||||
_ = oldControl.Close()
|
||||
}
|
||||
|
||||
@@ -317,6 +317,19 @@ func (s *Server) reinstallSession(dead *smux.Session) {
|
||||
s.reinstallMu.Lock()
|
||||
defer s.reinstallMu.Unlock()
|
||||
|
||||
// Close the old muxconn immediately so that any in-flight Push calls
|
||||
// (from data arriving on a new bridge before this reinstall completes)
|
||||
// are discarded rather than feeding stale frames into the dying smux
|
||||
// session. Without this, a client that reconnects faster than the
|
||||
// server can push new-session smux frames into the old muxconn,
|
||||
// corrupting the old smux session's stream state (manifests as
|
||||
// "frame too large" on the control stream).
|
||||
s.sessMu.RLock()
|
||||
if s.conn != nil {
|
||||
_ = s.conn.Close()
|
||||
}
|
||||
s.sessMu.RUnlock()
|
||||
|
||||
// Pre-build the replacement so we can swap atomically below.
|
||||
newConn := muxconn.New(s.ln, s.cipher)
|
||||
newSess, err := smux.Server(newConn, smuxConfig(linkMaxPayload(s.ln)))
|
||||
@@ -335,7 +348,6 @@ func (s *Server) reinstallSession(dead *smux.Session) {
|
||||
return
|
||||
}
|
||||
oldSess := s.session
|
||||
oldConn := s.conn
|
||||
oldControl := s.controlStrm
|
||||
oldControlStop := s.controlStop
|
||||
oldSID := s.sessionID
|
||||
@@ -353,9 +365,6 @@ func (s *Server) reinstallSession(dead *smux.Session) {
|
||||
if oldSess != nil {
|
||||
_ = oldSess.Close()
|
||||
}
|
||||
if oldConn != nil {
|
||||
_ = oldConn.Close()
|
||||
}
|
||||
if oldControl != nil {
|
||||
_ = oldControl.Close()
|
||||
}
|
||||
|
||||
@@ -667,3 +667,67 @@ func TestDispatchFiresOnTraffic(t *testing.T) {
|
||||
t.Fatalf("bytesOut = %d, want >= %d", rec.out, len(greeting))
|
||||
}
|
||||
}
|
||||
|
||||
func TestReinstallSessionClosesOldConnBeforeSwap(t *testing.T) {
|
||||
// Regression test: after carrier reconnect, a client that reconnects
|
||||
// faster can push smux frames into the server's old muxconn before
|
||||
// reinstallSession swaps it out. This corrupts the old smux session
|
||||
// and manifests as "frame too large" on the control stream.
|
||||
// The fix closes the old muxconn at the very start of reinstallSession
|
||||
// so Push calls during the swap window are discarded.
|
||||
cipher, err := cryptopkg.NewCipher("01234567890123456789012345678901")
|
||||
if err != nil {
|
||||
t.Fatalf("NewCipher() error = %v", err)
|
||||
}
|
||||
ln := &serverLinkStub{}
|
||||
conn := muxconn.New(ln, cipher)
|
||||
sess, err := smux.Server(conn, smuxConfig(0))
|
||||
if err != nil {
|
||||
t.Fatalf("smux.Server() error = %v", err)
|
||||
}
|
||||
s := &Server{
|
||||
ln: ln,
|
||||
cipher: cipher,
|
||||
conn: conn,
|
||||
session: sess,
|
||||
onClose: func(string, string) {},
|
||||
health: runtime.NewHealthTracker(nil),
|
||||
peerSessions: make(map[string]*peerSession),
|
||||
}
|
||||
|
||||
// Simulate the race: push data into old conn WHILE reinstallSession
|
||||
// is running (in a separate goroutine).
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
s.reinstallSession(sess)
|
||||
}()
|
||||
|
||||
// Give reinstallSession a moment to close the old conn.
|
||||
time.Sleep(5 * time.Millisecond)
|
||||
|
||||
// This simulates data arriving from a new bridge (fast-reconnecting client).
|
||||
// With the fix, Push should be a no-op (conn is already closed).
|
||||
// Without the fix, this would feed into the dying smux session.
|
||||
conn.Push([]byte("stale encrypted garbage"))
|
||||
|
||||
<-done
|
||||
|
||||
// Verify old conn is closed and new conn is installed.
|
||||
s.sessMu.RLock()
|
||||
newConn := s.conn
|
||||
newSess := s.session
|
||||
s.sessMu.RUnlock()
|
||||
|
||||
if newConn == conn {
|
||||
t.Fatal("reinstallSession did not swap conn")
|
||||
}
|
||||
if newSess == sess {
|
||||
t.Fatal("reinstallSession did not swap session")
|
||||
}
|
||||
if newConn == nil || newSess == nil {
|
||||
t.Fatal("reinstallSession left nil conn or session")
|
||||
}
|
||||
_ = newSess.Close()
|
||||
_ = newConn.Close()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user