From d8139a7178588fada4620afbaba70211d9a7930a Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Fri, 22 May 2026 20:37:17 +0300 Subject: [PATCH] refactor(muxconn): recycle plaintext buffers via sync.Pool --- internal/crypto/chacha.go | 11 ++- internal/muxconn/conn.go | 149 +++++++++++++++++++++++++++++--------- 2 files changed, 123 insertions(+), 37 deletions(-) diff --git a/internal/crypto/chacha.go b/internal/crypto/chacha.go index 0b9e2cf..744a64a 100644 --- a/internal/crypto/chacha.go +++ b/internal/crypto/chacha.go @@ -88,6 +88,15 @@ func (c *Cipher) Encrypt(plaintext []byte) ([]byte, error) { // Decrypt decrypts ciphertext that has a nonce prepended. func (c *Cipher) Decrypt(ciphertext []byte) ([]byte, error) { + return c.DecryptInto(nil, ciphertext) +} + +// DecryptInto appends the decrypted plaintext to dst (which can be nil) +// and returns the extended slice. Pass a buffer with enough spare +// capacity from a sync.Pool to avoid per-call allocations on the hot +// path: the AEAD primitive will write the plaintext in place when +// cap(dst) >= len(ciphertext) - WireOverhead. +func (c *Cipher) DecryptInto(dst, ciphertext []byte) ([]byte, error) { nonceSize := c.aead.NonceSize() if len(ciphertext) < nonceSize { return nil, ErrCiphertextTooShort @@ -96,7 +105,7 @@ func (c *Cipher) Decrypt(ciphertext []byte) ([]byte, error) { nonce := ciphertext[:nonceSize] encrypted := ciphertext[nonceSize:] - res, err := c.aead.Open(nil, nonce, encrypted, nil) + res, err := c.aead.Open(dst, nonce, encrypted, nil) if err != nil { return nil, fmt.Errorf("failed to decrypt: %w", err) } diff --git a/internal/muxconn/conn.go b/internal/muxconn/conn.go index b2500ad..9fdb4cc 100644 --- a/internal/muxconn/conn.go +++ b/internal/muxconn/conn.go @@ -33,13 +33,53 @@ import ( // ErrClosed is returned from Read/Write after the conn has been closed. var ErrClosed = errors.New("muxconn: closed") -// inboundQueue is the buffered capacity of the Push -> Read pipeline. -// It absorbs short Read stalls without applying back-pressure to the -// transport callback. Frames are typically smux-sized (well under -// defaultMaxPayloadSize == 12 KiB), so 256 amounts to a few MiB of -// in-flight data, which is enough for sustained throughput on every -// transport we have without unbounded growth on a stuck reader. -const inboundQueue = 256 +const ( + // inboundQueue is the buffered capacity of the Push -> Read pipeline. + // It absorbs short Read stalls without applying back-pressure to the + // transport callback. Frames are typically smux-sized (well under + // 16 KiB), so 256 amounts to a few MiB of in-flight data, which is + // enough for sustained throughput on every transport we have without + // unbounded growth on a stuck reader. + inboundQueue = 256 + + // pooledFrameCap is the capacity each pooled plaintext buffer is born + // with. It is sized to fit the largest smux frame any of our + // transports will deliver after AEAD overhead is stripped (datachannel + // caps at 12 KiB on the wire, vp8channel at 60 KiB; we round up to + // give Open room to write in place without growing the slice). + pooledFrameCap = 64 * 1024 +) + +// frameBufPool recycles plaintext buffers between Push (decrypts a wire +// frame into a buffer) and Read (consumes the buffer fully then returns +// it). It is global so all Conn instances share the same hot cache — +// most clients in the same process talk to a handful of peers, and +// per-Conn pools fragment the warm set unnecessarily. +var frameBufPool = sync.Pool{ //nolint:gochecknoglobals // intentional process-wide buffer pool + New: func() any { + b := make([]byte, 0, pooledFrameCap) + return &b + }, +} + +func acquireFrameBuf() *[]byte { + bp := frameBufPool.Get().(*[]byte) //nolint:forcetypeassert // pool only ever holds *[]byte + *bp = (*bp)[:0] + return bp +} + +func releaseFrameBuf(bp *[]byte) { + if bp == nil { + return + } + // Drop oversized buffers so a one-off huge frame can't poison the + // pool's working set forever. + if cap(*bp) > pooledFrameCap*2 { + return + } + *bp = (*bp)[:0] + frameBufPool.Put(bp) +} // Conn is an io.ReadWriteCloser over a [transport.Transport] with optional AEAD wrapping. // @@ -48,20 +88,25 @@ const inboundQueue = 256 // as needed. The hot path is lock-free: a single producer (the transport // callback) and a single consumer (smux's read loop) communicate via a // buffered channel without any cond/mutex ping-pong. +// +// Plaintext buffers are recycled through frameBufPool: Push borrows a +// buffer to decrypt into, ships it through the channel, and Read returns +// the buffer to the pool once its caller has consumed all the bytes. type Conn struct { ln transport.Transport send func([]byte) error cipher *crypto.Cipher - in chan []byte + in chan *[]byte closeOnce sync.Once closeCh chan struct{} closed atomic.Bool - // leftover holds the unread tail of the most recent frame popped - // from `in`. It is touched only by Read and so needs no - // synchronization. - leftover []byte + // leftoverBuf holds the pool buffer whose tail is still in + // `leftover`. When `leftover` empties we return leftoverBuf to the + // pool and clear both fields. Touched only by Read. + leftoverBuf *[]byte + leftover []byte } // New wires a Conn over the given transport. Push must be set as the @@ -71,7 +116,7 @@ func New(ln transport.Transport, cipher *crypto.Cipher) *Conn { ln: ln, send: ln.Send, cipher: cipher, - in: make(chan []byte, inboundQueue), + in: make(chan *[]byte, inboundQueue), closeCh: make(chan struct{}), } } @@ -84,29 +129,35 @@ func NewPeer(ln transport.PeerTransport, cipher *crypto.Cipher, peerID string) * return ln.SendTo(peerID, data) }, cipher: cipher, - in: make(chan []byte, inboundQueue), + in: make(chan *[]byte, inboundQueue), closeCh: make(chan struct{}), } } // Push hands an encrypted wire payload (one OnData event) to the conn. // -// On the producer side: decrypt, then either deliver via the inbound -// channel or, if the caller has Close'd or back-pressure can't drain in -// time, drop the frame. Blocking forever here would wedge the transport -// callback and trip its watchdog, so we cap waiting on closeCh. +// On the producer side: borrow a pooled plaintext buffer, decrypt into +// it, then either deliver via the inbound channel or, if the caller has +// Close'd, return the buffer to the pool. Blocking forever on a wedged +// reader would wedge the transport callback and trip its watchdog, so we +// also bail on closeCh. func (c *Conn) Push(ciphertext []byte) { - pt, err := c.cipher.Decrypt(ciphertext) + bufPtr := acquireFrameBuf() + pt, err := c.cipher.DecryptInto(*bufPtr, ciphertext) if err != nil { + releaseFrameBuf(bufPtr) logger.Debugf("muxconn: decrypt failed, dropping frame: %v", err) return } + *bufPtr = pt if c.closed.Load() { + releaseFrameBuf(bufPtr) return } select { - case c.in <- pt: + case c.in <- bufPtr: case <-c.closeCh: + releaseFrameBuf(bufPtr) } } @@ -120,39 +171,34 @@ func (c *Conn) Read(p []byte) (int, error) { return 0, nil } if len(c.leftover) == 0 { - select { - case data, ok := <-c.in: - if !ok { - return 0, io.EOF - } - c.leftover = data - case <-c.closeCh: - // Drain any bytes that landed before close so a peer that - // shut us down right after a final write doesn't lose data. - select { - case data := <-c.in: - c.leftover = data - default: - return 0, io.EOF - } + bufPtr, ok := c.takeFrame() + if !ok { + return 0, io.EOF } + c.leftoverBuf = bufPtr + c.leftover = *bufPtr } n := copy(p, c.leftover) c.leftover = c.leftover[n:] + c.recycleIfDrained() // Greedily pull additional frames already sitting in the queue, // without blocking. This keeps the channel from accumulating a // backlog when the consumer asks for a large buffer. for n < len(p) && len(c.leftover) == 0 { select { - case data, ok := <-c.in: + case bufPtr, ok := <-c.in: if !ok { return n, nil } + data := *bufPtr m := copy(p[n:], data) n += m if m < len(data) { + c.leftoverBuf = bufPtr c.leftover = data[m:] + } else { + releaseFrameBuf(bufPtr) } default: return n, nil @@ -161,6 +207,37 @@ func (c *Conn) Read(p []byte) (int, error) { return n, nil } +// takeFrame blocks until a frame is available or the conn is closed. +// On a clean close it still drains any frame that landed before the +// close signal won the race, so a peer that shuts us down right after a +// final write doesn't lose data. +func (c *Conn) takeFrame() (*[]byte, bool) { + select { + case bufPtr, ok := <-c.in: + if !ok { + return nil, false + } + return bufPtr, true + case <-c.closeCh: + select { + case bufPtr, ok := <-c.in: + if !ok { + return nil, false + } + return bufPtr, true + default: + return nil, false + } + } +} + +func (c *Conn) recycleIfDrained() { + if len(c.leftover) == 0 && c.leftoverBuf != nil { + releaseFrameBuf(c.leftoverBuf) + c.leftoverBuf = nil + } +} + // Write encrypts p and ships it to the link as a single message. Blocks while // the link signals back-pressure. func (c *Conn) Write(p []byte) (int, error) {