mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-06-07 12:54:43 +00:00
feat(mux,client,server): Add client ID tracking for multiplexed streams
This commit is contained in:
@@ -20,9 +20,10 @@ import (
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
peer *telemost.Peer
|
||||
cipher *crypto.Cipher
|
||||
mux *mux.Multiplexer
|
||||
peer *telemost.Peer
|
||||
cipher *crypto.Cipher
|
||||
mux *mux.Multiplexer
|
||||
clientID uint32
|
||||
}
|
||||
|
||||
func Run(roomURL, keyHex string, socksPort int) error {
|
||||
@@ -47,11 +48,14 @@ func Run(roomURL, keyHex string, socksPort int) error {
|
||||
return err
|
||||
}
|
||||
|
||||
clientID := uint32(time.Now().UnixNano() & 0xFFFFFFFF)
|
||||
|
||||
c := &Client{
|
||||
cipher: cipher,
|
||||
cipher: cipher,
|
||||
clientID: clientID,
|
||||
}
|
||||
|
||||
c.mux = mux.New(func(frame []byte) error {
|
||||
c.mux = mux.New(c.clientID, func(frame []byte) error {
|
||||
encrypted, err := c.cipher.Encrypt(frame)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -90,12 +94,13 @@ func Run(roomURL, keyHex string, socksPort int) error {
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
resetFrame := make([]byte, 4)
|
||||
resetFrame := make([]byte, 8)
|
||||
binary.BigEndian.PutUint16(resetFrame[0:2], 0xFFFF)
|
||||
binary.BigEndian.PutUint16(resetFrame[2:4], 0xFFFF)
|
||||
binary.BigEndian.PutUint32(resetFrame[4:8], c.clientID)
|
||||
encrypted, _ := cipher.Encrypt(resetFrame)
|
||||
peer.Send(encrypted)
|
||||
log.Println("Sent reset signal to server")
|
||||
log.Printf("Sent reset signal to server (clientID=%d)", c.clientID)
|
||||
|
||||
go peer.WatchConnection(ctx)
|
||||
|
||||
|
||||
@@ -10,24 +10,27 @@ import (
|
||||
)
|
||||
|
||||
type Stream struct {
|
||||
ID uint16
|
||||
recvBuf []byte
|
||||
closed bool
|
||||
mu sync.Mutex
|
||||
ID uint16
|
||||
ClientID uint32
|
||||
recvBuf []byte
|
||||
closed bool
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
type Multiplexer struct {
|
||||
streams map[uint16]*Stream
|
||||
nextID uint16
|
||||
onSend func([]byte) error
|
||||
mu sync.RWMutex
|
||||
streams map[uint16]*Stream
|
||||
nextID uint16
|
||||
clientID uint32
|
||||
onSend func([]byte) error
|
||||
mu sync.RWMutex
|
||||
}
|
||||
|
||||
func New(onSend func([]byte) error) *Multiplexer {
|
||||
func New(clientID uint32, onSend func([]byte) error) *Multiplexer {
|
||||
return &Multiplexer{
|
||||
streams: make(map[uint16]*Stream),
|
||||
nextID: 1,
|
||||
onSend: onSend,
|
||||
streams: make(map[uint16]*Stream),
|
||||
nextID: 1,
|
||||
clientID: clientID,
|
||||
onSend: onSend,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -63,10 +66,11 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error {
|
||||
}
|
||||
|
||||
chunk := data[i:end]
|
||||
frame := make([]byte, 4+len(chunk))
|
||||
binary.BigEndian.PutUint16(frame[0:2], sid)
|
||||
binary.BigEndian.PutUint16(frame[2:4], uint16(len(chunk)))
|
||||
copy(frame[4:], chunk)
|
||||
frame := make([]byte, 8+len(chunk))
|
||||
binary.BigEndian.PutUint32(frame[0:4], m.clientID)
|
||||
binary.BigEndian.PutUint16(frame[4:6], sid)
|
||||
binary.BigEndian.PutUint16(frame[6:8], uint16(len(chunk)))
|
||||
copy(frame[8:], chunk)
|
||||
|
||||
if err := m.onSend(frame); err != nil {
|
||||
return err
|
||||
@@ -84,57 +88,63 @@ func (m *Multiplexer) CloseStream(sid uint16) error {
|
||||
stream.closed = true
|
||||
}
|
||||
|
||||
frame := make([]byte, 4)
|
||||
binary.BigEndian.PutUint16(frame[0:2], sid)
|
||||
binary.BigEndian.PutUint16(frame[2:4], 0)
|
||||
frame := make([]byte, 8)
|
||||
binary.BigEndian.PutUint32(frame[0:4], m.clientID)
|
||||
binary.BigEndian.PutUint16(frame[4:6], sid)
|
||||
binary.BigEndian.PutUint16(frame[6:8], 0)
|
||||
|
||||
return m.onSend(frame)
|
||||
}
|
||||
|
||||
func (m *Multiplexer) HandleFrame(frame []byte) {
|
||||
if len(frame) < 4 {
|
||||
if len(frame) < 8 {
|
||||
return
|
||||
}
|
||||
|
||||
sid := binary.BigEndian.Uint16(frame[0:2])
|
||||
length := binary.BigEndian.Uint16(frame[2:4])
|
||||
clientID := binary.BigEndian.Uint32(frame[0:4])
|
||||
sid := binary.BigEndian.Uint16(frame[4:6])
|
||||
length := binary.BigEndian.Uint16(frame[6:8])
|
||||
|
||||
if sid == 0xFFFF && length == 0xFFFF {
|
||||
m.mu.Lock()
|
||||
for _, stream := range m.streams {
|
||||
stream.closed = true
|
||||
for streamSid, stream := range m.streams {
|
||||
if stream.ClientID == clientID {
|
||||
stream.closed = true
|
||||
delete(m.streams, streamSid)
|
||||
}
|
||||
}
|
||||
m.streams = make(map[uint16]*Stream)
|
||||
m.nextID = 1
|
||||
m.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
if length == 0 {
|
||||
m.mu.Lock()
|
||||
if stream, exists := m.streams[sid]; exists {
|
||||
if stream, exists := m.streams[sid]; exists && stream.ClientID == clientID {
|
||||
stream.closed = true
|
||||
}
|
||||
m.mu.Unlock()
|
||||
return
|
||||
}
|
||||
|
||||
if len(frame) < 4+int(length) {
|
||||
if len(frame) < 8+int(length) {
|
||||
return
|
||||
}
|
||||
|
||||
data := frame[4 : 4+length]
|
||||
data := frame[8 : 8+length]
|
||||
|
||||
m.mu.Lock()
|
||||
stream, exists := m.streams[sid]
|
||||
if !exists {
|
||||
stream = &Stream{
|
||||
ID: sid,
|
||||
recvBuf: make([]byte, 0),
|
||||
ID: sid,
|
||||
ClientID: clientID,
|
||||
recvBuf: make([]byte, 0),
|
||||
}
|
||||
m.streams[sid] = stream
|
||||
}
|
||||
stream.recvBuf = append(stream.recvBuf, data...)
|
||||
if stream.ClientID == clientID {
|
||||
stream.recvBuf = append(stream.recvBuf, data...)
|
||||
}
|
||||
m.mu.Unlock()
|
||||
}
|
||||
|
||||
@@ -171,6 +181,12 @@ func (m *Multiplexer) GetStreams() []uint16 {
|
||||
return sids
|
||||
}
|
||||
|
||||
func (m *Multiplexer) GetStream(sid uint16) *Stream {
|
||||
m.mu.RLock()
|
||||
defer m.mu.RUnlock()
|
||||
return m.streams[sid]
|
||||
}
|
||||
|
||||
func (m *Multiplexer) Reset() {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
|
||||
@@ -60,7 +60,7 @@ func Run(roomURL, keyHex string) error {
|
||||
connections: make(map[uint16]net.Conn),
|
||||
}
|
||||
|
||||
s.mux = mux.New(func(frame []byte) error {
|
||||
s.mux = mux.New(0, func(frame []byte) error {
|
||||
encrypted, err := s.cipher.Encrypt(frame)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -119,18 +119,22 @@ func (s *Server) onData(data []byte) {
|
||||
return
|
||||
}
|
||||
|
||||
if len(plaintext) >= 4 {
|
||||
sid := binary.BigEndian.Uint16(plaintext[0:2])
|
||||
length := binary.BigEndian.Uint16(plaintext[2:4])
|
||||
if len(plaintext) >= 8 {
|
||||
clientID := binary.BigEndian.Uint32(plaintext[0:4])
|
||||
sid := binary.BigEndian.Uint16(plaintext[4:6])
|
||||
length := binary.BigEndian.Uint16(plaintext[6:8])
|
||||
|
||||
if sid == 0xFFFF && length == 0xFFFF {
|
||||
log.Println("Received reset signal from client - cleaning up")
|
||||
log.Printf("Received reset signal from client (clientID=%d) - cleaning up", clientID)
|
||||
s.connMu.Lock()
|
||||
for sid, conn := range s.connections {
|
||||
if conn != nil {
|
||||
conn.Close()
|
||||
for streamSid, conn := range s.connections {
|
||||
stream := s.mux.GetStream(streamSid)
|
||||
if stream != nil && stream.ClientID == clientID {
|
||||
if conn != nil {
|
||||
conn.Close()
|
||||
}
|
||||
delete(s.connections, streamSid)
|
||||
}
|
||||
delete(s.connections, sid)
|
||||
}
|
||||
s.connMu.Unlock()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user