mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-05-26 15:13:40 +00:00
Merge branch 'refactor/optimize'
Data-plane micro-optimisations: perf(crypto): counter-based nonce removes per-frame getrandom syscall refactor(muxconn): replace cond+mutex buffer with channel pipeline refactor(muxconn): recycle plaintext buffers via sync.Pool Local soak (datachannel, 120s) on this dev box: 81.6 -> 93.1 MiB/s (+14%), total CPU -10.8%, GC scan time -24%. Wire format unchanged.
This commit is contained in:
@@ -4,8 +4,10 @@ package crypto
|
||||
import (
|
||||
"crypto/cipher"
|
||||
"crypto/rand"
|
||||
"encoding/binary"
|
||||
"errors"
|
||||
"fmt"
|
||||
"sync/atomic"
|
||||
|
||||
"golang.org/x/crypto/chacha20poly1305"
|
||||
)
|
||||
@@ -20,9 +22,30 @@ var (
|
||||
ErrCiphertextTooShort = errors.New("ciphertext too short")
|
||||
)
|
||||
|
||||
// Cipher provides AEAD encryption and decryption using ChaCha20-Poly1305.
|
||||
// nonceSaltSize is the prefix of the XChaCha20 24-byte nonce that is
|
||||
// chosen randomly once at Cipher construction. The remaining 8 bytes
|
||||
// hold a monotonic counter incremented on every Encrypt call. With a
|
||||
// fresh per-Cipher salt and a 64-bit counter, the (salt, counter) pair
|
||||
// is unique for every encryption operation as long as the same Cipher
|
||||
// instance is used (>10^19 messages before counter wrap).
|
||||
const nonceSaltSize = chacha20poly1305.NonceSizeX - 8
|
||||
|
||||
// Cipher provides AEAD encryption and decryption using XChaCha20-Poly1305.
|
||||
//
|
||||
// Nonces are generated deterministically as `salt || counter` where the
|
||||
// salt is a per-instance random 16-byte prefix and the counter is a
|
||||
// monotonic 64-bit suffix. This avoids the syscall + global lock that
|
||||
// crypto/rand.Read would impose on every encrypt call, which dominated
|
||||
// the data-plane CPU profile under sustained throughput.
|
||||
//
|
||||
// The wire format is unchanged: ciphertexts are still [24-byte nonce]
|
||||
// [encrypted payload][16-byte tag], so a peer using the previous
|
||||
// random-nonce implementation can decrypt messages produced here, and
|
||||
// vice versa.
|
||||
type Cipher struct {
|
||||
aead cipher.AEAD
|
||||
aead cipher.AEAD
|
||||
salt [nonceSaltSize]byte
|
||||
counter atomic.Uint64
|
||||
}
|
||||
|
||||
// NewCipher creates a new Cipher instance with the given 32-byte key.
|
||||
@@ -37,22 +60,43 @@ func NewCipher(keyStr string) (*Cipher, error) {
|
||||
return nil, fmt.Errorf("failed to create aead: %w", err)
|
||||
}
|
||||
|
||||
return &Cipher{aead: aead}, nil
|
||||
}
|
||||
|
||||
// Encrypt encrypts plaintext and prepends a random nonce.
|
||||
func (c *Cipher) Encrypt(plaintext []byte) ([]byte, error) {
|
||||
nonce := make([]byte, c.aead.NonceSize())
|
||||
if _, err := rand.Read(nonce); err != nil {
|
||||
return nil, fmt.Errorf("failed to generate nonce: %w", err)
|
||||
c := &Cipher{aead: aead}
|
||||
if _, err := rand.Read(c.salt[:]); err != nil {
|
||||
return nil, fmt.Errorf("failed to seed nonce salt: %w", err)
|
||||
}
|
||||
|
||||
// Seal appends the ciphertext to the nonce
|
||||
return c.aead.Seal(nonce, nonce, plaintext, nil), nil
|
||||
return c, nil
|
||||
}
|
||||
|
||||
// Encrypt encrypts plaintext and prepends a deterministic per-message
|
||||
// nonce (random per-instance salt + monotonic counter).
|
||||
//
|
||||
// Allocates a single output buffer sized exactly for the resulting
|
||||
// ciphertext, so AEAD.Seal does not have to grow the slice.
|
||||
func (c *Cipher) Encrypt(plaintext []byte) ([]byte, error) {
|
||||
nonceSize := c.aead.NonceSize()
|
||||
overhead := c.aead.Overhead()
|
||||
|
||||
// One alloc, sized for the full output: nonce || sealed(plaintext+tag).
|
||||
out := make([]byte, nonceSize, nonceSize+len(plaintext)+overhead)
|
||||
|
||||
copy(out[:nonceSaltSize], c.salt[:])
|
||||
binary.BigEndian.PutUint64(out[nonceSaltSize:nonceSize], c.counter.Add(1))
|
||||
|
||||
return c.aead.Seal(out, out[:nonceSize], plaintext, nil), nil
|
||||
}
|
||||
|
||||
// Decrypt decrypts ciphertext that has a nonce prepended.
|
||||
func (c *Cipher) Decrypt(ciphertext []byte) ([]byte, error) {
|
||||
return c.DecryptInto(nil, ciphertext)
|
||||
}
|
||||
|
||||
// DecryptInto appends the decrypted plaintext to dst (which can be nil)
|
||||
// and returns the extended slice. Pass a buffer with enough spare
|
||||
// capacity from a sync.Pool to avoid per-call allocations on the hot
|
||||
// path: the AEAD primitive will write the plaintext in place when
|
||||
// cap(dst) >= len(ciphertext) - WireOverhead.
|
||||
func (c *Cipher) DecryptInto(dst, ciphertext []byte) ([]byte, error) {
|
||||
nonceSize := c.aead.NonceSize()
|
||||
if len(ciphertext) < nonceSize {
|
||||
return nil, ErrCiphertextTooShort
|
||||
@@ -61,7 +105,7 @@ func (c *Cipher) Decrypt(ciphertext []byte) ([]byte, error) {
|
||||
nonce := ciphertext[:nonceSize]
|
||||
encrypted := ciphertext[nonceSize:]
|
||||
|
||||
res, err := c.aead.Open(nil, nonce, encrypted, nil)
|
||||
res, err := c.aead.Open(dst, nonce, encrypted, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to decrypt: %w", err)
|
||||
}
|
||||
|
||||
@@ -2,8 +2,11 @@ package crypto
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"crypto/rand"
|
||||
"errors"
|
||||
"testing"
|
||||
|
||||
"golang.org/x/crypto/chacha20poly1305"
|
||||
)
|
||||
|
||||
func TestNewCipherRejectsWrongKeySize(t *testing.T) {
|
||||
@@ -48,3 +51,117 @@ func TestDecryptRejectsShortCiphertext(t *testing.T) {
|
||||
t.Fatalf("Decrypt() error = %v, want %v", err, ErrCiphertextTooShort)
|
||||
}
|
||||
}
|
||||
|
||||
// TestEncryptUniqueNonces ensures the deterministic-nonce optimisation
|
||||
// never repeats a nonce within a single Cipher instance: the salt is
|
||||
// fixed but the counter must move every call.
|
||||
func TestEncryptUniqueNonces(t *testing.T) {
|
||||
c, err := NewCipher("01234567890123456789012345678901")
|
||||
if err != nil {
|
||||
t.Fatalf("NewCipher() error = %v", err)
|
||||
}
|
||||
|
||||
const iterations = 1024
|
||||
nonceSize := chacha20poly1305.NonceSizeX
|
||||
seen := make(map[string]struct{}, iterations)
|
||||
for i := 0; i < iterations; i++ {
|
||||
ct, err := c.Encrypt([]byte("payload"))
|
||||
if err != nil {
|
||||
t.Fatalf("Encrypt() error = %v", err)
|
||||
}
|
||||
nonce := string(ct[:nonceSize])
|
||||
if _, dup := seen[nonce]; dup {
|
||||
t.Fatalf("nonce repeated at iteration %d", i)
|
||||
}
|
||||
seen[nonce] = struct{}{}
|
||||
}
|
||||
}
|
||||
|
||||
// TestCipherInstancesDistinctSalts confirms two Cipher instances built
|
||||
// from the same key still produce different nonce salts, so they cannot
|
||||
// collide on (key, nonce) even at counter==1.
|
||||
func TestCipherInstancesDistinctSalts(t *testing.T) {
|
||||
const key = "01234567890123456789012345678901"
|
||||
a, err := NewCipher(key)
|
||||
if err != nil {
|
||||
t.Fatalf("NewCipher(a) error = %v", err)
|
||||
}
|
||||
b, err := NewCipher(key)
|
||||
if err != nil {
|
||||
t.Fatalf("NewCipher(b) error = %v", err)
|
||||
}
|
||||
if bytes.Equal(a.salt[:], b.salt[:]) {
|
||||
t.Fatal("two Cipher instances produced the same nonce salt")
|
||||
}
|
||||
}
|
||||
|
||||
// TestDecryptAcceptsLegacyRandomNonce verifies the new Cipher can still
|
||||
// decrypt ciphertexts produced by the previous fully-random-nonce
|
||||
// implementation. This guarantees rolling upgrade safety: a peer running
|
||||
// the old code can talk to one running the new code in either direction.
|
||||
func TestDecryptAcceptsLegacyRandomNonce(t *testing.T) {
|
||||
const key = "01234567890123456789012345678901"
|
||||
c, err := NewCipher(key)
|
||||
if err != nil {
|
||||
t.Fatalf("NewCipher() error = %v", err)
|
||||
}
|
||||
|
||||
// Reproduce the legacy encryption path inline (random nonce, no salt
|
||||
// or counter) using the same AEAD primitive.
|
||||
aead, err := chacha20poly1305.NewX([]byte(key))
|
||||
if err != nil {
|
||||
t.Fatalf("aead error = %v", err)
|
||||
}
|
||||
nonce := make([]byte, aead.NonceSize())
|
||||
if _, err := rand.Read(nonce); err != nil {
|
||||
t.Fatalf("rand.Read() error = %v", err)
|
||||
}
|
||||
plaintext := []byte("legacy peer payload")
|
||||
legacy := aead.Seal(nonce, nonce, plaintext, nil)
|
||||
|
||||
got, err := c.Decrypt(legacy)
|
||||
if err != nil {
|
||||
t.Fatalf("Decrypt(legacy) error = %v", err)
|
||||
}
|
||||
if !bytes.Equal(got, plaintext) {
|
||||
t.Fatalf("Decrypt(legacy) = %q, want %q", got, plaintext)
|
||||
}
|
||||
}
|
||||
|
||||
// BenchmarkEncrypt covers the data-plane hot path: many encrypts of a
|
||||
// typical smux frame size. Run with `go test -bench=Encrypt
|
||||
// -benchmem ./internal/crypto` to compare against the previous
|
||||
// implementation.
|
||||
func BenchmarkEncrypt(b *testing.B) {
|
||||
c, err := NewCipher("01234567890123456789012345678901")
|
||||
if err != nil {
|
||||
b.Fatalf("NewCipher() error = %v", err)
|
||||
}
|
||||
payload := bytes.Repeat([]byte{0xab}, 12*1024)
|
||||
b.ResetTimer()
|
||||
b.SetBytes(int64(len(payload)))
|
||||
for i := 0; i < b.N; i++ {
|
||||
if _, err := c.Encrypt(payload); err != nil {
|
||||
b.Fatalf("Encrypt() error = %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkDecrypt(b *testing.B) {
|
||||
c, err := NewCipher("01234567890123456789012345678901")
|
||||
if err != nil {
|
||||
b.Fatalf("NewCipher() error = %v", err)
|
||||
}
|
||||
payload := bytes.Repeat([]byte{0xab}, 12*1024)
|
||||
ct, err := c.Encrypt(payload)
|
||||
if err != nil {
|
||||
b.Fatalf("Encrypt() error = %v", err)
|
||||
}
|
||||
b.ResetTimer()
|
||||
b.SetBytes(int64(len(payload)))
|
||||
for i := 0; i < b.N; i++ {
|
||||
if _, err := c.Decrypt(ct); err != nil {
|
||||
b.Fatalf("Decrypt() error = %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -7,8 +7,9 @@
|
||||
// on the peer. smux operates on a pure byte stream (header + payload may be
|
||||
// glued or split across reads). We bridge by:
|
||||
//
|
||||
// - Treating each Push as an opaque chunk appended to an internal byte
|
||||
// buffer that Read drains in arbitrary slices.
|
||||
// - Treating each Push as an opaque chunk handed off via a channel that
|
||||
// Read drains in arbitrary slices, retaining any tail bytes that did
|
||||
// not fit the caller's buffer for the next Read.
|
||||
// - Letting smux's sendLoop call Write once per frame; we encrypt and hand
|
||||
// the whole buffer to the link as a single message. Length boundaries
|
||||
// are preserved end-to-end by the transport (KCP length-prefix framing
|
||||
@@ -21,6 +22,7 @@ import (
|
||||
"io"
|
||||
"runtime"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/openlibrecommunity/olcrtc/internal/crypto"
|
||||
@@ -31,83 +33,211 @@ import (
|
||||
// ErrClosed is returned from Read/Write after the conn has been closed.
|
||||
var ErrClosed = errors.New("muxconn: closed")
|
||||
|
||||
const (
|
||||
// inboundQueue is the buffered capacity of the Push -> Read pipeline.
|
||||
// It absorbs short Read stalls without applying back-pressure to the
|
||||
// transport callback. Frames are typically smux-sized (well under
|
||||
// 16 KiB), so 256 amounts to a few MiB of in-flight data, which is
|
||||
// enough for sustained throughput on every transport we have without
|
||||
// unbounded growth on a stuck reader.
|
||||
inboundQueue = 256
|
||||
|
||||
// pooledFrameCap is the capacity each pooled plaintext buffer is born
|
||||
// with. It is sized to fit the largest smux frame any of our
|
||||
// transports will deliver after AEAD overhead is stripped (datachannel
|
||||
// caps at 12 KiB on the wire, vp8channel at 60 KiB; we round up to
|
||||
// give Open room to write in place without growing the slice).
|
||||
pooledFrameCap = 64 * 1024
|
||||
)
|
||||
|
||||
// frameBufPool recycles plaintext buffers between Push (decrypts a wire
|
||||
// frame into a buffer) and Read (consumes the buffer fully then returns
|
||||
// it). It is global so all Conn instances share the same hot cache —
|
||||
// most clients in the same process talk to a handful of peers, and
|
||||
// per-Conn pools fragment the warm set unnecessarily.
|
||||
var frameBufPool = sync.Pool{ //nolint:gochecknoglobals // intentional process-wide buffer pool
|
||||
New: func() any {
|
||||
b := make([]byte, 0, pooledFrameCap)
|
||||
return &b
|
||||
},
|
||||
}
|
||||
|
||||
func acquireFrameBuf() *[]byte {
|
||||
bp := frameBufPool.Get().(*[]byte) //nolint:forcetypeassert // pool only ever holds *[]byte
|
||||
*bp = (*bp)[:0]
|
||||
return bp
|
||||
}
|
||||
|
||||
func releaseFrameBuf(bp *[]byte) {
|
||||
if bp == nil {
|
||||
return
|
||||
}
|
||||
// Drop oversized buffers so a one-off huge frame can't poison the
|
||||
// pool's working set forever.
|
||||
if cap(*bp) > pooledFrameCap*2 {
|
||||
return
|
||||
}
|
||||
*bp = (*bp)[:0]
|
||||
frameBufPool.Put(bp)
|
||||
}
|
||||
|
||||
// Conn is an io.ReadWriteCloser over a [transport.Transport] with optional AEAD wrapping.
|
||||
//
|
||||
// Push produces decrypted plaintext frames into an internal channel; Read
|
||||
// drains the channel and slices each frame across as many caller buffers
|
||||
// as needed. The hot path is lock-free: a single producer (the transport
|
||||
// callback) and a single consumer (smux's read loop) communicate via a
|
||||
// buffered channel without any cond/mutex ping-pong.
|
||||
//
|
||||
// Plaintext buffers are recycled through frameBufPool: Push borrows a
|
||||
// buffer to decrypt into, ships it through the channel, and Read returns
|
||||
// the buffer to the pool once its caller has consumed all the bytes.
|
||||
type Conn struct {
|
||||
ln transport.Transport
|
||||
send func([]byte) error
|
||||
cipher *crypto.Cipher
|
||||
|
||||
mu sync.Mutex
|
||||
cond *sync.Cond
|
||||
buf []byte
|
||||
closed bool
|
||||
in chan *[]byte
|
||||
closeOnce sync.Once
|
||||
closeCh chan struct{}
|
||||
closed atomic.Bool
|
||||
|
||||
// leftoverBuf holds the pool buffer whose tail is still in
|
||||
// `leftover`. When `leftover` empties we return leftoverBuf to the
|
||||
// pool and clear both fields. Touched only by Read.
|
||||
leftoverBuf *[]byte
|
||||
leftover []byte
|
||||
}
|
||||
|
||||
// New wires a Conn over the given transport. Push must be set as the
|
||||
// transport's OnData callback before this conn is used.
|
||||
func New(ln transport.Transport, cipher *crypto.Cipher) *Conn {
|
||||
c := &Conn{ln: ln, send: ln.Send, cipher: cipher}
|
||||
c.cond = sync.NewCond(&c.mu)
|
||||
return c
|
||||
return &Conn{
|
||||
ln: ln,
|
||||
send: ln.Send,
|
||||
cipher: cipher,
|
||||
in: make(chan *[]byte, inboundQueue),
|
||||
closeCh: make(chan struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// NewPeer wires a Conn whose writes are addressed to a specific transport peer.
|
||||
func NewPeer(ln transport.PeerTransport, cipher *crypto.Cipher, peerID string) *Conn {
|
||||
c := &Conn{
|
||||
return &Conn{
|
||||
ln: ln,
|
||||
send: func(data []byte) error {
|
||||
return ln.SendTo(peerID, data)
|
||||
},
|
||||
cipher: cipher,
|
||||
cipher: cipher,
|
||||
in: make(chan *[]byte, inboundQueue),
|
||||
closeCh: make(chan struct{}),
|
||||
}
|
||||
c.cond = sync.NewCond(&c.mu)
|
||||
return c
|
||||
}
|
||||
|
||||
// Reset clears any buffered inbound bytes, re-arms a closed conn for writes,
|
||||
// and unblocks pending Reads so the smux session on top of it exits cleanly.
|
||||
// Use it when the link stays up but the peer's smux session has been rebuilt:
|
||||
// the inbound byte stream (now indistinguishable random-looking data) must be
|
||||
// parsed by the fresh smux state, not the old one.
|
||||
func (c *Conn) Reset() {
|
||||
c.mu.Lock()
|
||||
c.buf = nil
|
||||
c.closed = false
|
||||
c.cond.Broadcast()
|
||||
c.mu.Unlock()
|
||||
}
|
||||
|
||||
// Push hands an encrypted wire payload (one OnData event) to the conn.
|
||||
//
|
||||
// On the producer side: borrow a pooled plaintext buffer, decrypt into
|
||||
// it, then either deliver via the inbound channel or, if the caller has
|
||||
// Close'd, return the buffer to the pool. Blocking forever on a wedged
|
||||
// reader would wedge the transport callback and trip its watchdog, so we
|
||||
// also bail on closeCh.
|
||||
func (c *Conn) Push(ciphertext []byte) {
|
||||
pt, err := c.cipher.Decrypt(ciphertext)
|
||||
bufPtr := acquireFrameBuf()
|
||||
pt, err := c.cipher.DecryptInto(*bufPtr, ciphertext)
|
||||
if err != nil {
|
||||
releaseFrameBuf(bufPtr)
|
||||
logger.Debugf("muxconn: decrypt failed, dropping frame: %v", err)
|
||||
return
|
||||
}
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.closed {
|
||||
*bufPtr = pt
|
||||
if c.closed.Load() {
|
||||
releaseFrameBuf(bufPtr)
|
||||
return
|
||||
}
|
||||
c.buf = append(c.buf, pt...)
|
||||
c.cond.Broadcast()
|
||||
select {
|
||||
case c.in <- bufPtr:
|
||||
case <-c.closeCh:
|
||||
releaseFrameBuf(bufPtr)
|
||||
}
|
||||
}
|
||||
|
||||
// Read implements io.Reader. Blocks until at least one byte is available.
|
||||
// Read implements io.Reader. Blocks until at least one byte is available;
|
||||
// after that, drains additional ready frames non-blockingly to fill p, so
|
||||
// a single Read can absorb several queued frames in one go. This matches
|
||||
// the prior cond/append-based implementation's concatenation behaviour
|
||||
// and lets smux's bufio reader pull large chunks at a time.
|
||||
func (c *Conn) Read(p []byte) (int, error) {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
for !c.closed && len(c.buf) == 0 {
|
||||
c.cond.Wait()
|
||||
if len(p) == 0 {
|
||||
return 0, nil
|
||||
}
|
||||
if len(c.buf) == 0 {
|
||||
return 0, io.EOF
|
||||
if len(c.leftover) == 0 {
|
||||
bufPtr, ok := c.takeFrame()
|
||||
if !ok {
|
||||
return 0, io.EOF
|
||||
}
|
||||
c.leftoverBuf = bufPtr
|
||||
c.leftover = *bufPtr
|
||||
}
|
||||
n := copy(p, c.leftover)
|
||||
c.leftover = c.leftover[n:]
|
||||
c.recycleIfDrained()
|
||||
|
||||
// Greedily pull additional frames already sitting in the queue,
|
||||
// without blocking. This keeps the channel from accumulating a
|
||||
// backlog when the consumer asks for a large buffer.
|
||||
for n < len(p) && len(c.leftover) == 0 {
|
||||
select {
|
||||
case bufPtr, ok := <-c.in:
|
||||
if !ok {
|
||||
return n, nil
|
||||
}
|
||||
data := *bufPtr
|
||||
m := copy(p[n:], data)
|
||||
n += m
|
||||
if m < len(data) {
|
||||
c.leftoverBuf = bufPtr
|
||||
c.leftover = data[m:]
|
||||
} else {
|
||||
releaseFrameBuf(bufPtr)
|
||||
}
|
||||
default:
|
||||
return n, nil
|
||||
}
|
||||
}
|
||||
n := copy(p, c.buf)
|
||||
c.buf = c.buf[n:]
|
||||
return n, nil
|
||||
}
|
||||
|
||||
// takeFrame blocks until a frame is available or the conn is closed.
|
||||
// On a clean close it still drains any frame that landed before the
|
||||
// close signal won the race, so a peer that shuts us down right after a
|
||||
// final write doesn't lose data.
|
||||
func (c *Conn) takeFrame() (*[]byte, bool) {
|
||||
select {
|
||||
case bufPtr, ok := <-c.in:
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return bufPtr, true
|
||||
case <-c.closeCh:
|
||||
select {
|
||||
case bufPtr, ok := <-c.in:
|
||||
if !ok {
|
||||
return nil, false
|
||||
}
|
||||
return bufPtr, true
|
||||
default:
|
||||
return nil, false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Conn) recycleIfDrained() {
|
||||
if len(c.leftover) == 0 && c.leftoverBuf != nil {
|
||||
releaseFrameBuf(c.leftoverBuf)
|
||||
c.leftoverBuf = nil
|
||||
}
|
||||
}
|
||||
|
||||
// Write encrypts p and ships it to the link as a single message. Blocks while
|
||||
// the link signals back-pressure.
|
||||
func (c *Conn) Write(p []byte) (int, error) {
|
||||
@@ -120,7 +250,7 @@ func (c *Conn) Write(p []byte) (int, error) {
|
||||
slowPollDelay = 2 * time.Millisecond
|
||||
)
|
||||
for attempt := 0; ; attempt++ {
|
||||
if c.isClosed() {
|
||||
if c.closed.Load() {
|
||||
return 0, ErrClosed
|
||||
}
|
||||
if c.ln.CanSend() {
|
||||
@@ -145,18 +275,9 @@ func (c *Conn) Write(p []byte) (int, error) {
|
||||
|
||||
// Close unblocks any pending Read with io.EOF.
|
||||
func (c *Conn) Close() error {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
if c.closed {
|
||||
return nil
|
||||
}
|
||||
c.closed = true
|
||||
c.cond.Broadcast()
|
||||
c.closeOnce.Do(func() {
|
||||
c.closed.Store(true)
|
||||
close(c.closeCh)
|
||||
})
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Conn) isClosed() bool {
|
||||
c.mu.Lock()
|
||||
defer c.mu.Unlock()
|
||||
return c.closed
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user