From 302b249c0cb24f7a6f26ad76db4f7900ac6d7749 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Thu, 9 Apr 2026 16:50:14 +0300 Subject: [PATCH] feat(mux,client,server): Add client ID tracking for multiplexed streams --- internal/client/client.go | 19 ++++++---- internal/mux/mux.go | 80 +++++++++++++++++++++++---------------- internal/server/server.go | 22 ++++++----- 3 files changed, 73 insertions(+), 48 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index 37c604b..f1fe668 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -20,9 +20,10 @@ import ( ) type Client struct { - peer *telemost.Peer - cipher *crypto.Cipher - mux *mux.Multiplexer + peer *telemost.Peer + cipher *crypto.Cipher + mux *mux.Multiplexer + clientID uint32 } func Run(roomURL, keyHex string, socksPort int) error { @@ -47,11 +48,14 @@ func Run(roomURL, keyHex string, socksPort int) error { return err } + clientID := uint32(time.Now().UnixNano() & 0xFFFFFFFF) + c := &Client{ - cipher: cipher, + cipher: cipher, + clientID: clientID, } - c.mux = mux.New(func(frame []byte) error { + c.mux = mux.New(c.clientID, func(frame []byte) error { encrypted, err := c.cipher.Encrypt(frame) if err != nil { return err @@ -90,12 +94,13 @@ func Run(roomURL, keyHex string, socksPort int) error { time.Sleep(100 * time.Millisecond) - resetFrame := make([]byte, 4) + resetFrame := make([]byte, 8) binary.BigEndian.PutUint16(resetFrame[0:2], 0xFFFF) binary.BigEndian.PutUint16(resetFrame[2:4], 0xFFFF) + binary.BigEndian.PutUint32(resetFrame[4:8], c.clientID) encrypted, _ := cipher.Encrypt(resetFrame) peer.Send(encrypted) - log.Println("Sent reset signal to server") + log.Printf("Sent reset signal to server (clientID=%d)", c.clientID) go peer.WatchConnection(ctx) diff --git a/internal/mux/mux.go b/internal/mux/mux.go index 1895487..64f49df 100644 --- a/internal/mux/mux.go +++ b/internal/mux/mux.go @@ -10,24 +10,27 @@ import ( ) type Stream struct { - ID uint16 - recvBuf []byte - closed bool - mu sync.Mutex + ID uint16 + ClientID uint32 + recvBuf []byte + closed bool + mu sync.Mutex } type Multiplexer struct { - streams map[uint16]*Stream - nextID uint16 - onSend func([]byte) error - mu sync.RWMutex + streams map[uint16]*Stream + nextID uint16 + clientID uint32 + onSend func([]byte) error + mu sync.RWMutex } -func New(onSend func([]byte) error) *Multiplexer { +func New(clientID uint32, onSend func([]byte) error) *Multiplexer { return &Multiplexer{ - streams: make(map[uint16]*Stream), - nextID: 1, - onSend: onSend, + streams: make(map[uint16]*Stream), + nextID: 1, + clientID: clientID, + onSend: onSend, } } @@ -63,10 +66,11 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error { } chunk := data[i:end] - frame := make([]byte, 4+len(chunk)) - binary.BigEndian.PutUint16(frame[0:2], sid) - binary.BigEndian.PutUint16(frame[2:4], uint16(len(chunk))) - copy(frame[4:], chunk) + frame := make([]byte, 8+len(chunk)) + binary.BigEndian.PutUint32(frame[0:4], m.clientID) + binary.BigEndian.PutUint16(frame[4:6], sid) + binary.BigEndian.PutUint16(frame[6:8], uint16(len(chunk))) + copy(frame[8:], chunk) if err := m.onSend(frame); err != nil { return err @@ -84,57 +88,63 @@ func (m *Multiplexer) CloseStream(sid uint16) error { stream.closed = true } - frame := make([]byte, 4) - binary.BigEndian.PutUint16(frame[0:2], sid) - binary.BigEndian.PutUint16(frame[2:4], 0) + frame := make([]byte, 8) + binary.BigEndian.PutUint32(frame[0:4], m.clientID) + binary.BigEndian.PutUint16(frame[4:6], sid) + binary.BigEndian.PutUint16(frame[6:8], 0) return m.onSend(frame) } func (m *Multiplexer) HandleFrame(frame []byte) { - if len(frame) < 4 { + if len(frame) < 8 { return } - sid := binary.BigEndian.Uint16(frame[0:2]) - length := binary.BigEndian.Uint16(frame[2:4]) + clientID := binary.BigEndian.Uint32(frame[0:4]) + sid := binary.BigEndian.Uint16(frame[4:6]) + length := binary.BigEndian.Uint16(frame[6:8]) if sid == 0xFFFF && length == 0xFFFF { m.mu.Lock() - for _, stream := range m.streams { - stream.closed = true + for streamSid, stream := range m.streams { + if stream.ClientID == clientID { + stream.closed = true + delete(m.streams, streamSid) + } } - m.streams = make(map[uint16]*Stream) - m.nextID = 1 m.mu.Unlock() return } if length == 0 { m.mu.Lock() - if stream, exists := m.streams[sid]; exists { + if stream, exists := m.streams[sid]; exists && stream.ClientID == clientID { stream.closed = true } m.mu.Unlock() return } - if len(frame) < 4+int(length) { + if len(frame) < 8+int(length) { return } - data := frame[4 : 4+length] + data := frame[8 : 8+length] m.mu.Lock() stream, exists := m.streams[sid] if !exists { stream = &Stream{ - ID: sid, - recvBuf: make([]byte, 0), + ID: sid, + ClientID: clientID, + recvBuf: make([]byte, 0), } m.streams[sid] = stream } - stream.recvBuf = append(stream.recvBuf, data...) + if stream.ClientID == clientID { + stream.recvBuf = append(stream.recvBuf, data...) + } m.mu.Unlock() } @@ -171,6 +181,12 @@ func (m *Multiplexer) GetStreams() []uint16 { return sids } +func (m *Multiplexer) GetStream(sid uint16) *Stream { + m.mu.RLock() + defer m.mu.RUnlock() + return m.streams[sid] +} + func (m *Multiplexer) Reset() { m.mu.Lock() defer m.mu.Unlock() diff --git a/internal/server/server.go b/internal/server/server.go index c7be0c8..b703472 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -60,7 +60,7 @@ func Run(roomURL, keyHex string) error { connections: make(map[uint16]net.Conn), } - s.mux = mux.New(func(frame []byte) error { + s.mux = mux.New(0, func(frame []byte) error { encrypted, err := s.cipher.Encrypt(frame) if err != nil { return err @@ -119,18 +119,22 @@ func (s *Server) onData(data []byte) { return } - if len(plaintext) >= 4 { - sid := binary.BigEndian.Uint16(plaintext[0:2]) - length := binary.BigEndian.Uint16(plaintext[2:4]) + if len(plaintext) >= 8 { + clientID := binary.BigEndian.Uint32(plaintext[0:4]) + sid := binary.BigEndian.Uint16(plaintext[4:6]) + length := binary.BigEndian.Uint16(plaintext[6:8]) if sid == 0xFFFF && length == 0xFFFF { - log.Println("Received reset signal from client - cleaning up") + log.Printf("Received reset signal from client (clientID=%d) - cleaning up", clientID) s.connMu.Lock() - for sid, conn := range s.connections { - if conn != nil { - conn.Close() + for streamSid, conn := range s.connections { + stream := s.mux.GetStream(streamSid) + if stream != nil && stream.ClientID == clientID { + if conn != nil { + conn.Close() + } + delete(s.connections, streamSid) } - delete(s.connections, sid) } s.connMu.Unlock() }