mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-05-31 09:29:45 +00:00
feat(mux,client,server): Add sequence numbering and out-of-order packet handling
This commit is contained in:
@@ -124,10 +124,11 @@ func Run(ctx context.Context, roomURL, keyHex string, socksPort int, duo bool) e
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
resetFrame := make([]byte, 8)
|
||||
resetFrame := make([]byte, 12)
|
||||
binary.BigEndian.PutUint32(resetFrame[0:4], c.clientID)
|
||||
binary.BigEndian.PutUint16(resetFrame[4:6], 0xFFFF)
|
||||
binary.BigEndian.PutUint16(resetFrame[6:8], 0xFFFF)
|
||||
binary.BigEndian.PutUint32(resetFrame[8:12], 0)
|
||||
encrypted, _ := cipher.Encrypt(resetFrame)
|
||||
|
||||
for _, peer := range c.peers {
|
||||
|
||||
@@ -12,11 +12,13 @@ import (
|
||||
)
|
||||
|
||||
type Stream struct {
|
||||
ID uint16
|
||||
ClientID uint32
|
||||
recvBuf []byte
|
||||
closed bool
|
||||
mu sync.Mutex
|
||||
ID uint16
|
||||
ClientID uint32
|
||||
recvBuf []byte
|
||||
closed bool
|
||||
mu sync.Mutex
|
||||
nextSeq uint32
|
||||
outOfOrder map[uint32][]byte
|
||||
}
|
||||
|
||||
func (s *Stream) RecvBuf() []byte {
|
||||
@@ -35,6 +37,8 @@ type Multiplexer struct {
|
||||
maxBufferSize int
|
||||
dataReady map[uint16]chan struct{}
|
||||
dataReadyMu sync.Mutex
|
||||
sendSeq map[uint16]uint32
|
||||
sendSeqMu sync.Mutex
|
||||
}
|
||||
|
||||
func New(clientID uint32, onSend func([]byte) error) *Multiplexer {
|
||||
@@ -46,6 +50,7 @@ func New(clientID uint32, onSend func([]byte) error) *Multiplexer {
|
||||
maxStreams: 10000,
|
||||
maxBufferSize: 1024 * 1024,
|
||||
dataReady: make(map[uint16]chan struct{}),
|
||||
sendSeq: make(map[uint16]uint32),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -90,11 +95,18 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error {
|
||||
}
|
||||
|
||||
chunk := data[i:end]
|
||||
frame := make([]byte, 8+len(chunk))
|
||||
|
||||
m.sendSeqMu.Lock()
|
||||
seq := m.sendSeq[sid]
|
||||
m.sendSeq[sid]++
|
||||
m.sendSeqMu.Unlock()
|
||||
|
||||
frame := make([]byte, 12+len(chunk))
|
||||
binary.BigEndian.PutUint32(frame[0:4], m.clientID)
|
||||
binary.BigEndian.PutUint16(frame[4:6], sid)
|
||||
binary.BigEndian.PutUint16(frame[6:8], uint16(len(chunk)))
|
||||
copy(frame[8:], chunk)
|
||||
binary.BigEndian.PutUint32(frame[8:12], seq)
|
||||
copy(frame[12:], chunk)
|
||||
|
||||
if err := m.onSend(frame); err != nil {
|
||||
return err
|
||||
@@ -111,23 +123,45 @@ func (m *Multiplexer) CloseStream(sid uint16) error {
|
||||
if stream, exists := m.streams[sid]; exists {
|
||||
stream.closed = true
|
||||
}
|
||||
|
||||
m.sendSeqMu.Lock()
|
||||
delete(m.sendSeq, sid)
|
||||
m.sendSeqMu.Unlock()
|
||||
|
||||
frame := make([]byte, 8)
|
||||
frame := make([]byte, 12)
|
||||
binary.BigEndian.PutUint32(frame[0:4], m.clientID)
|
||||
binary.BigEndian.PutUint16(frame[4:6], sid)
|
||||
binary.BigEndian.PutUint16(frame[6:8], 0)
|
||||
binary.BigEndian.PutUint32(frame[8:12], 0)
|
||||
|
||||
return m.onSend(frame)
|
||||
}
|
||||
|
||||
func (m *Multiplexer) HandleFrame(frame []byte) {
|
||||
if len(frame) < 8 {
|
||||
if len(frame) < 12 {
|
||||
if len(frame) >= 8 {
|
||||
clientID := binary.BigEndian.Uint32(frame[0:4])
|
||||
sid := binary.BigEndian.Uint16(frame[4:6])
|
||||
length := binary.BigEndian.Uint16(frame[6:8])
|
||||
|
||||
if sid == 0xFFFF && length == 0xFFFF {
|
||||
m.mu.Lock()
|
||||
for streamSid, stream := range m.streams {
|
||||
if stream.ClientID == clientID {
|
||||
stream.closed = true
|
||||
delete(m.streams, streamSid)
|
||||
}
|
||||
}
|
||||
m.mu.Unlock()
|
||||
}
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
clientID := binary.BigEndian.Uint32(frame[0:4])
|
||||
sid := binary.BigEndian.Uint16(frame[4:6])
|
||||
length := binary.BigEndian.Uint16(frame[6:8])
|
||||
seq := binary.BigEndian.Uint32(frame[8:12])
|
||||
|
||||
if sid == 0xFFFF && length == 0xFFFF {
|
||||
m.mu.Lock()
|
||||
@@ -150,11 +184,11 @@ func (m *Multiplexer) HandleFrame(frame []byte) {
|
||||
return
|
||||
}
|
||||
|
||||
if len(frame) < 8+int(length) {
|
||||
if len(frame) < 12+int(length) {
|
||||
return
|
||||
}
|
||||
|
||||
data := frame[8 : 8+length]
|
||||
data := frame[12 : 12+length]
|
||||
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
@@ -165,32 +199,56 @@ func (m *Multiplexer) HandleFrame(frame []byte) {
|
||||
return
|
||||
}
|
||||
stream = &Stream{
|
||||
ID: sid,
|
||||
ClientID: clientID,
|
||||
recvBuf: make([]byte, 0),
|
||||
ID: sid,
|
||||
ClientID: clientID,
|
||||
recvBuf: make([]byte, 0),
|
||||
nextSeq: 0,
|
||||
outOfOrder: make(map[uint32][]byte),
|
||||
}
|
||||
m.streams[sid] = stream
|
||||
} else if stream.ClientID != clientID {
|
||||
stream.ClientID = clientID
|
||||
stream.recvBuf = make([]byte, 0)
|
||||
stream.closed = false
|
||||
stream.nextSeq = 0
|
||||
stream.outOfOrder = make(map[uint32][]byte)
|
||||
}
|
||||
|
||||
if len(stream.recvBuf)+len(data) > m.maxBufferSize {
|
||||
stream.closed = true
|
||||
return
|
||||
}
|
||||
|
||||
stream.recvBuf = append(stream.recvBuf, data...)
|
||||
|
||||
m.dataReadyMu.Lock()
|
||||
if ch, ok := m.dataReady[sid]; ok {
|
||||
select {
|
||||
case ch <- struct{}{}:
|
||||
default:
|
||||
if seq == stream.nextSeq {
|
||||
if len(stream.recvBuf)+len(data) > m.maxBufferSize {
|
||||
stream.closed = true
|
||||
return
|
||||
}
|
||||
stream.recvBuf = append(stream.recvBuf, data...)
|
||||
stream.nextSeq++
|
||||
|
||||
for {
|
||||
if nextData, ok := stream.outOfOrder[stream.nextSeq]; ok {
|
||||
if len(stream.recvBuf)+len(nextData) > m.maxBufferSize {
|
||||
stream.closed = true
|
||||
return
|
||||
}
|
||||
stream.recvBuf = append(stream.recvBuf, nextData...)
|
||||
delete(stream.outOfOrder, stream.nextSeq)
|
||||
stream.nextSeq++
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
m.dataReadyMu.Lock()
|
||||
if ch, ok := m.dataReady[sid]; ok {
|
||||
select {
|
||||
case ch <- struct{}{}:
|
||||
default:
|
||||
}
|
||||
}
|
||||
m.dataReadyMu.Unlock()
|
||||
} else if seq > stream.nextSeq {
|
||||
if len(stream.outOfOrder) < 100 {
|
||||
stream.outOfOrder[seq] = append([]byte(nil), data...)
|
||||
}
|
||||
}
|
||||
m.dataReadyMu.Unlock()
|
||||
}
|
||||
|
||||
func (m *Multiplexer) ReadStream(sid uint16) []byte {
|
||||
@@ -242,6 +300,10 @@ func (m *Multiplexer) Reset() {
|
||||
|
||||
m.streams = make(map[uint16]*Stream)
|
||||
m.nextID = 1
|
||||
|
||||
m.sendSeqMu.Lock()
|
||||
m.sendSeq = make(map[uint16]uint32)
|
||||
m.sendSeqMu.Unlock()
|
||||
}
|
||||
|
||||
func (m *Multiplexer) UpdateSendFunc(onSend func([]byte) error) {
|
||||
|
||||
@@ -154,7 +154,26 @@ func (s *Server) onData(data []byte) {
|
||||
|
||||
logger.Verbose("Received %d bytes from client", len(plaintext))
|
||||
|
||||
if len(plaintext) >= 8 {
|
||||
if len(plaintext) >= 12 {
|
||||
clientID := binary.BigEndian.Uint32(plaintext[0:4])
|
||||
sid := binary.BigEndian.Uint16(plaintext[4:6])
|
||||
length := binary.BigEndian.Uint16(plaintext[6:8])
|
||||
|
||||
if sid == 0xFFFF && length == 0xFFFF {
|
||||
log.Printf("Received reset signal from client (clientID=%d) - cleaning up", clientID)
|
||||
s.connMu.Lock()
|
||||
for streamSid, conn := range s.connections {
|
||||
stream := s.mux.GetStream(streamSid)
|
||||
if stream != nil && stream.ClientID == clientID {
|
||||
if conn != nil {
|
||||
conn.Close()
|
||||
}
|
||||
delete(s.connections, streamSid)
|
||||
}
|
||||
}
|
||||
s.connMu.Unlock()
|
||||
}
|
||||
} else if len(plaintext) >= 8 {
|
||||
clientID := binary.BigEndian.Uint32(plaintext[0:4])
|
||||
sid := binary.BigEndian.Uint16(plaintext[4:6])
|
||||
length := binary.BigEndian.Uint16(plaintext[6:8])
|
||||
|
||||
Reference in New Issue
Block a user