mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-05-26 07:08:11 +00:00
refactor(muxconn): recycle plaintext buffers via sync.Pool
This commit is contained in:
@@ -88,6 +88,15 @@ func (c *Cipher) Encrypt(plaintext []byte) ([]byte, error) {
|
||||
|
||||
// Decrypt decrypts ciphertext that has a nonce prepended.
|
||||
func (c *Cipher) Decrypt(ciphertext []byte) ([]byte, error) {
|
||||
return c.DecryptInto(nil, ciphertext)
|
||||
}
|
||||
|
||||
// DecryptInto appends the decrypted plaintext to dst (which can be nil)
|
||||
// and returns the extended slice. Pass a buffer with enough spare
|
||||
// capacity from a sync.Pool to avoid per-call allocations on the hot
|
||||
// path: the AEAD primitive will write the plaintext in place when
|
||||
// cap(dst) >= len(ciphertext) - WireOverhead.
|
||||
func (c *Cipher) DecryptInto(dst, ciphertext []byte) ([]byte, error) {
|
||||
nonceSize := c.aead.NonceSize()
|
||||
if len(ciphertext) < nonceSize {
|
||||
return nil, ErrCiphertextTooShort
|
||||
@@ -96,7 +105,7 @@ func (c *Cipher) Decrypt(ciphertext []byte) ([]byte, error) {
|
||||
nonce := ciphertext[:nonceSize]
|
||||
encrypted := ciphertext[nonceSize:]
|
||||
|
||||
res, err := c.aead.Open(nil, nonce, encrypted, nil)
|
||||
res, err := c.aead.Open(dst, nonce, encrypted, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to decrypt: %w", err)
|
||||
}
|
||||
|
||||
@@ -33,13 +33,53 @@ import (
|
||||
// ErrClosed is returned from Read/Write after the conn has been closed.
|
||||
var ErrClosed = errors.New("muxconn: closed")
|
||||
|
||||
// inboundQueue is the buffered capacity of the Push -> Read pipeline.
|
||||
// It absorbs short Read stalls without applying back-pressure to the
|
||||
// transport callback. Frames are typically smux-sized (well under
|
||||
// defaultMaxPayloadSize == 12 KiB), so 256 amounts to a few MiB of
|
||||
// in-flight data, which is enough for sustained throughput on every
|
||||
// transport we have without unbounded growth on a stuck reader.
|
||||
const inboundQueue = 256
|
||||
const (
|
||||
// inboundQueue is the buffered capacity of the Push -> Read pipeline.
|
||||
// It absorbs short Read stalls without applying back-pressure to the
|
||||
// transport callback. Frames are typically smux-sized (well under
|
||||
// 16 KiB), so 256 amounts to a few MiB of in-flight data, which is
|
||||
// enough for sustained throughput on every transport we have without
|
||||
// unbounded growth on a stuck reader.
|
||||
inboundQueue = 256
|
||||
|
||||
// pooledFrameCap is the capacity each pooled plaintext buffer is born
|
||||
// with. It is sized to fit the largest smux frame any of our
|
||||
// transports will deliver after AEAD overhead is stripped (datachannel
|
||||
// caps at 12 KiB on the wire, vp8channel at 60 KiB; we round up to
|
||||
// give Open room to write in place without growing the slice).
|
||||
pooledFrameCap = 64 * 1024
|
||||
)
|
||||
|
||||
// frameBufPool recycles plaintext buffers between Push (decrypts a wire
|
||||
// frame into a buffer) and Read (consumes the buffer fully then returns
|
||||
// it). It is global so all Conn instances share the same hot cache —
|
||||
// most clients in the same process talk to a handful of peers, and
|
||||
// per-Conn pools fragment the warm set unnecessarily.
|
||||
var frameBufPool = sync.Pool{ //nolint:gochecknoglobals // intentional process-wide buffer pool
|
||||
New: func() any {
|
||||
b := make([]byte, 0, pooledFrameCap)
|
||||
return &b
|
||||
},
|
||||
}
|
||||
|
||||
func acquireFrameBuf() *[]byte {
|
||||
bp := frameBufPool.Get().(*[]byte) //nolint:forcetypeassert // pool only ever holds *[]byte
|
||||
*bp = (*bp)[:0]
|
||||
return bp
|
||||
}
|
||||
|
||||
func releaseFrameBuf(bp *[]byte) {
|
||||
if bp == nil {
|
||||
return
|
||||
}
|
||||
// Drop oversized buffers so a one-off huge frame can't poison the
|
||||
// pool's working set forever.
|
||||
if cap(*bp) > pooledFrameCap*2 {
|
||||
return
|
||||
}
|
||||
*bp = (*bp)[:0]
|
||||
frameBufPool.Put(bp)
|
||||
}
|
||||
|
||||
// Conn is an io.ReadWriteCloser over a [transport.Transport] with optional AEAD wrapping.
|
||||
//
|
||||
@@ -48,19 +88,24 @@ const inboundQueue = 256
|
||||
// as needed. The hot path is lock-free: a single producer (the transport
|
||||
// callback) and a single consumer (smux's read loop) communicate via a
|
||||
// buffered channel without any cond/mutex ping-pong.
|
||||
//
|
||||
// Plaintext buffers are recycled through frameBufPool: Push borrows a
|
||||
// buffer to decrypt into, ships it through the channel, and Read returns
|
||||
// the buffer to the pool once its caller has consumed all the bytes.
|
||||
type Conn struct {
|
||||
ln transport.Transport
|
||||
send func([]byte) error
|
||||
cipher *crypto.Cipher
|
||||
|
||||
in chan []byte
|
||||
in chan *[]byte
|
||||
closeOnce sync.Once
|
||||
closeCh chan struct{}
|
||||
closed atomic.Bool
|
||||
|
||||
// leftover holds the unread tail of the most recent frame popped
|
||||
// from `in`. It is touched only by Read and so needs no
|
||||
// synchronization.
|
||||
// leftoverBuf holds the pool buffer whose tail is still in
|
||||
// `leftover`. When `leftover` empties we return leftoverBuf to the
|
||||
// pool and clear both fields. Touched only by Read.
|
||||
leftoverBuf *[]byte
|
||||
leftover []byte
|
||||
}
|
||||
|
||||
@@ -71,7 +116,7 @@ func New(ln transport.Transport, cipher *crypto.Cipher) *Conn {
|
||||
ln: ln,
|
||||
send: ln.Send,
|
||||
cipher: cipher,
|
||||
in: make(chan []byte, inboundQueue),
|
||||
in: make(chan *[]byte, inboundQueue),
|
||||
closeCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
@@ -84,29 +129,35 @@ func NewPeer(ln transport.PeerTransport, cipher *crypto.Cipher, peerID string) *
|
||||
return ln.SendTo(peerID, data)
|
||||
},
|
||||
cipher: cipher,
|
||||
in: make(chan []byte, inboundQueue),
|
||||
in: make(chan *[]byte, inboundQueue),
|
||||
closeCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// Push hands an encrypted wire payload (one OnData event) to the conn.
|
||||
//
|
||||
// On the producer side: decrypt, then either deliver via the inbound
|
||||
// channel or, if the caller has Close'd or back-pressure can't drain in
|
||||
// time, drop the frame. Blocking forever here would wedge the transport
|
||||
// callback and trip its watchdog, so we cap waiting on closeCh.
|
||||
// On the producer side: borrow a pooled plaintext buffer, decrypt into
|
||||
// it, then either deliver via the inbound channel or, if the caller has
|
||||
// Close'd, return the buffer to the pool. Blocking forever on a wedged
|
||||
// reader would wedge the transport callback and trip its watchdog, so we
|
||||
// also bail on closeCh.
|
||||
func (c *Conn) Push(ciphertext []byte) {
|
||||
pt, err := c.cipher.Decrypt(ciphertext)
|
||||
bufPtr := acquireFrameBuf()
|
||||
pt, err := c.cipher.DecryptInto(*bufPtr, ciphertext)
|
||||
if err != nil {
|
||||
releaseFrameBuf(bufPtr)
|
||||
logger.Debugf("muxconn: decrypt failed, dropping frame: %v", err)
|
||||
return
|
||||
}
|
||||
*bufPtr = pt
|
||||
if c.closed.Load() {
|
||||
releaseFrameBuf(bufPtr)
|
||||
return
|
||||
}
|
||||
select {
|
||||
case c.in <- pt:
|
||||
case c.in <- bufPtr:
|
||||
case <-c.closeCh:
|
||||
releaseFrameBuf(bufPtr)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -120,39 +171,34 @@ func (c *Conn) Read(p []byte) (int, error) {
|
||||
return 0, nil
|
||||
}
|
||||
if len(c.leftover) == 0 {
|
||||
select {
|
||||
case data, ok := <-c.in:
|
||||
bufPtr, ok := c.takeFrame()
|
||||
if !ok {
|
||||
return 0, io.EOF
|
||||
}
|
||||
c.leftover = data
|
||||
case <-c.closeCh:
|
||||
// Drain any bytes that landed before close so a peer that
|
||||
// shut us down right after a final write doesn't lose data.
|
||||
select {
|
||||
case data := <-c.in:
|
||||
c.leftover = data
|
||||
default:
|
||||
return 0, io.EOF
|
||||
}
|
||||
}
|
||||
c.leftoverBuf = bufPtr
|
||||
c.leftover = *bufPtr
|
||||
}
|
||||
n := copy(p, c.leftover)
|
||||
c.leftover = c.leftover[n:]
|
||||
c.recycleIfDrained()
|
||||
|
||||
// Greedily pull additional frames already sitting in the queue,
|
||||
// without blocking. This keeps the channel from accumulating a
|
||||
// backlog when the consumer asks for a large buffer.
|
||||
for n < len(p) && len(c.leftover) == 0 {
|
||||
select {
|
||||
case data, ok := <-c.in:
|
||||
case bufPtr, ok := <-c.in:
|
||||
if !ok {
|
||||
return n, nil
|
||||
}
|
||||
data := *bufPtr
|
||||
m := copy(p[n:], data)
|
||||
n += m
|
||||
if m < len(data) {
|
||||
c.leftoverBuf = bufPtr
|
||||
c.leftover = data[m:]
|
||||
} else {
|
||||
releaseFrameBuf(bufPtr)
|
||||
}
|
||||
default:
|
||||
return n, nil
|
||||
@@ -161,6 +207,37 @@ func (c *Conn) Read(p []byte) (int, error) {
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// takeFrame blocks until a frame is available or the conn is closed.
|
||||
// On a clean close it still drains any frame that landed before the
|
||||
// close signal won the race, so a peer that shuts us down right after a
|
||||
// final write doesn't lose data.
|
||||
func (c *Conn) takeFrame() (*[]byte, bool) {
|
||||
select {
|
||||
case bufPtr, ok := <-c.in:
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return bufPtr, true
|
||||
case <-c.closeCh:
|
||||
select {
|
||||
case bufPtr, ok := <-c.in:
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return bufPtr, true
|
||||
default:
|
||||
return nil, false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Conn) recycleIfDrained() {
|
||||
if len(c.leftover) == 0 && c.leftoverBuf != nil {
|
||||
releaseFrameBuf(c.leftoverBuf)
|
||||
c.leftoverBuf = nil
|
||||
}
|
||||
}
|
||||
|
||||
// Write encrypts p and ships it to the link as a single message. Blocks while
|
||||
// the link signals back-pressure.
|
||||
func (c *Conn) Write(p []byte) (int, error) {
|
||||
|
||||
Reference in New Issue
Block a user