From 498e95620250d8100c9b283ce3afd583f2f9ca3f Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Thu, 9 Apr 2026 19:27:35 +0300 Subject: [PATCH] feat(mux,client,server): Add sequence numbering and out-of-order packet handling --- internal/client/client.go | 3 +- internal/mux/mux.go | 116 +++++++++++++++++++++++++++++--------- internal/server/server.go | 21 ++++++- 3 files changed, 111 insertions(+), 29 deletions(-) diff --git a/internal/client/client.go b/internal/client/client.go index dc45e08..a0b2875 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -124,10 +124,11 @@ func Run(ctx context.Context, roomURL, keyHex string, socksPort int, duo bool) e time.Sleep(100 * time.Millisecond) - resetFrame := make([]byte, 8) + resetFrame := make([]byte, 12) binary.BigEndian.PutUint32(resetFrame[0:4], c.clientID) binary.BigEndian.PutUint16(resetFrame[4:6], 0xFFFF) binary.BigEndian.PutUint16(resetFrame[6:8], 0xFFFF) + binary.BigEndian.PutUint32(resetFrame[8:12], 0) encrypted, _ := cipher.Encrypt(resetFrame) for _, peer := range c.peers { diff --git a/internal/mux/mux.go b/internal/mux/mux.go index 6f17f37..9bdaa62 100644 --- a/internal/mux/mux.go +++ b/internal/mux/mux.go @@ -12,11 +12,13 @@ import ( ) type Stream struct { - ID uint16 - ClientID uint32 - recvBuf []byte - closed bool - mu sync.Mutex + ID uint16 + ClientID uint32 + recvBuf []byte + closed bool + mu sync.Mutex + nextSeq uint32 + outOfOrder map[uint32][]byte } func (s *Stream) RecvBuf() []byte { @@ -35,6 +37,8 @@ type Multiplexer struct { maxBufferSize int dataReady map[uint16]chan struct{} dataReadyMu sync.Mutex + sendSeq map[uint16]uint32 + sendSeqMu sync.Mutex } func New(clientID uint32, onSend func([]byte) error) *Multiplexer { @@ -46,6 +50,7 @@ func New(clientID uint32, onSend func([]byte) error) *Multiplexer { maxStreams: 10000, maxBufferSize: 1024 * 1024, dataReady: make(map[uint16]chan struct{}), + sendSeq: make(map[uint16]uint32), } } @@ -90,11 +95,18 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error { } chunk := data[i:end] - frame := make([]byte, 8+len(chunk)) + + m.sendSeqMu.Lock() + seq := m.sendSeq[sid] + m.sendSeq[sid]++ + m.sendSeqMu.Unlock() + + frame := make([]byte, 12+len(chunk)) binary.BigEndian.PutUint32(frame[0:4], m.clientID) binary.BigEndian.PutUint16(frame[4:6], sid) binary.BigEndian.PutUint16(frame[6:8], uint16(len(chunk))) - copy(frame[8:], chunk) + binary.BigEndian.PutUint32(frame[8:12], seq) + copy(frame[12:], chunk) if err := m.onSend(frame); err != nil { return err @@ -111,23 +123,45 @@ func (m *Multiplexer) CloseStream(sid uint16) error { if stream, exists := m.streams[sid]; exists { stream.closed = true } + + m.sendSeqMu.Lock() + delete(m.sendSeq, sid) + m.sendSeqMu.Unlock() - frame := make([]byte, 8) + frame := make([]byte, 12) binary.BigEndian.PutUint32(frame[0:4], m.clientID) binary.BigEndian.PutUint16(frame[4:6], sid) binary.BigEndian.PutUint16(frame[6:8], 0) + binary.BigEndian.PutUint32(frame[8:12], 0) return m.onSend(frame) } func (m *Multiplexer) HandleFrame(frame []byte) { - if len(frame) < 8 { + if len(frame) < 12 { + if len(frame) >= 8 { + clientID := binary.BigEndian.Uint32(frame[0:4]) + sid := binary.BigEndian.Uint16(frame[4:6]) + length := binary.BigEndian.Uint16(frame[6:8]) + + if sid == 0xFFFF && length == 0xFFFF { + m.mu.Lock() + for streamSid, stream := range m.streams { + if stream.ClientID == clientID { + stream.closed = true + delete(m.streams, streamSid) + } + } + m.mu.Unlock() + } + } return } clientID := binary.BigEndian.Uint32(frame[0:4]) sid := binary.BigEndian.Uint16(frame[4:6]) length := binary.BigEndian.Uint16(frame[6:8]) + seq := binary.BigEndian.Uint32(frame[8:12]) if sid == 0xFFFF && length == 0xFFFF { m.mu.Lock() @@ -150,11 +184,11 @@ func (m *Multiplexer) HandleFrame(frame []byte) { return } - if len(frame) < 8+int(length) { + if len(frame) < 12+int(length) { return } - data := frame[8 : 8+length] + data := frame[12 : 12+length] m.mu.Lock() defer m.mu.Unlock() @@ -165,32 +199,56 @@ func (m *Multiplexer) HandleFrame(frame []byte) { return } stream = &Stream{ - ID: sid, - ClientID: clientID, - recvBuf: make([]byte, 0), + ID: sid, + ClientID: clientID, + recvBuf: make([]byte, 0), + nextSeq: 0, + outOfOrder: make(map[uint32][]byte), } m.streams[sid] = stream } else if stream.ClientID != clientID { stream.ClientID = clientID stream.recvBuf = make([]byte, 0) stream.closed = false + stream.nextSeq = 0 + stream.outOfOrder = make(map[uint32][]byte) } - if len(stream.recvBuf)+len(data) > m.maxBufferSize { - stream.closed = true - return - } - - stream.recvBuf = append(stream.recvBuf, data...) - - m.dataReadyMu.Lock() - if ch, ok := m.dataReady[sid]; ok { - select { - case ch <- struct{}{}: - default: + if seq == stream.nextSeq { + if len(stream.recvBuf)+len(data) > m.maxBufferSize { + stream.closed = true + return + } + stream.recvBuf = append(stream.recvBuf, data...) + stream.nextSeq++ + + for { + if nextData, ok := stream.outOfOrder[stream.nextSeq]; ok { + if len(stream.recvBuf)+len(nextData) > m.maxBufferSize { + stream.closed = true + return + } + stream.recvBuf = append(stream.recvBuf, nextData...) + delete(stream.outOfOrder, stream.nextSeq) + stream.nextSeq++ + } else { + break + } + } + + m.dataReadyMu.Lock() + if ch, ok := m.dataReady[sid]; ok { + select { + case ch <- struct{}{}: + default: + } + } + m.dataReadyMu.Unlock() + } else if seq > stream.nextSeq { + if len(stream.outOfOrder) < 100 { + stream.outOfOrder[seq] = append([]byte(nil), data...) } } - m.dataReadyMu.Unlock() } func (m *Multiplexer) ReadStream(sid uint16) []byte { @@ -242,6 +300,10 @@ func (m *Multiplexer) Reset() { m.streams = make(map[uint16]*Stream) m.nextID = 1 + + m.sendSeqMu.Lock() + m.sendSeq = make(map[uint16]uint32) + m.sendSeqMu.Unlock() } func (m *Multiplexer) UpdateSendFunc(onSend func([]byte) error) { diff --git a/internal/server/server.go b/internal/server/server.go index 660a508..e470db0 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -154,7 +154,26 @@ func (s *Server) onData(data []byte) { logger.Verbose("Received %d bytes from client", len(plaintext)) - if len(plaintext) >= 8 { + if len(plaintext) >= 12 { + clientID := binary.BigEndian.Uint32(plaintext[0:4]) + sid := binary.BigEndian.Uint16(plaintext[4:6]) + length := binary.BigEndian.Uint16(plaintext[6:8]) + + if sid == 0xFFFF && length == 0xFFFF { + log.Printf("Received reset signal from client (clientID=%d) - cleaning up", clientID) + s.connMu.Lock() + for streamSid, conn := range s.connections { + stream := s.mux.GetStream(streamSid) + if stream != nil && stream.ClientID == clientID { + if conn != nil { + conn.Close() + } + delete(s.connections, streamSid) + } + } + s.connMu.Unlock() + } + } else if len(plaintext) >= 8 { clientID := binary.BigEndian.Uint32(plaintext[0:4]) sid := binary.BigEndian.Uint16(plaintext[4:6]) length := binary.BigEndian.Uint16(plaintext[6:8])