From ca0191d0dee5d8895b26d20d058ca7a2156a7e94 Mon Sep 17 00:00:00 2001 From: Qtozdec <56160254+qtozdec@users.noreply.github.com> Date: Fri, 10 Apr 2026 16:03:42 +0300 Subject: [PATCH] Improve Telemost session behavior --- internal/client/client.go | 33 ++-- internal/mux/mux.go | 3 +- internal/names/names.go | 11 +- internal/names/names_test.go | 19 ++ internal/server/server.go | 71 +++---- internal/telemost/peer.go | 334 +++++++++++++++++++++++++++++++-- internal/telemost/peer_test.go | 26 +++ 7 files changed, 429 insertions(+), 68 deletions(-) create mode 100644 internal/names/names_test.go create mode 100644 internal/telemost/peer_test.go diff --git a/internal/client/client.go b/internal/client/client.go index 8188fc0..91de118 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -14,12 +14,12 @@ import ( "sync/atomic" "time" - "github.com/pion/webrtc/v4" "github.com/openlibrecommunity/olcrtc/internal/crypto" "github.com/openlibrecommunity/olcrtc/internal/logger" "github.com/openlibrecommunity/olcrtc/internal/mux" "github.com/openlibrecommunity/olcrtc/internal/names" "github.com/openlibrecommunity/olcrtc/internal/telemost" + "github.com/pion/webrtc/v4" ) type Client struct { @@ -32,6 +32,9 @@ type Client struct { } func Run(ctx context.Context, roomURL, keyHex string, socksPort int, duo bool, socksUser, socksPass string) error { + runCtx, cancel := context.WithCancel(ctx) + defer cancel() + var key []byte var err error @@ -89,12 +92,12 @@ func Run(ctx context.Context, roomURL, keyHex string, socksPort int, duo bool, s } time.Sleep(10 * time.Millisecond) } - + encrypted, err := c.cipher.Encrypt(frame) if err != nil { return err } - + idx := c.peerIdx.Add(1) % uint32(len(c.peers)) return c.peers[idx].Send(encrypted) }) @@ -104,11 +107,15 @@ func Run(ctx context.Context, roomURL, keyHex string, socksPort int, duo bool, s if err != nil { return err } + peer.SetEndedCallback(func(reason string) { + log.Printf("Client peer %d reported conference end: %s", i, reason) + cancel() + }) c.peers = append(c.peers, peer) peer.SetReconnectCallback(func(dc *webrtc.DataChannel) { log.Printf("Client peer %d reconnected - resetting multiplexer state", i) - + c.mux.UpdateSendFunc(func(frame []byte) error { encrypted, err := c.cipher.Encrypt(frame) if err != nil { @@ -117,14 +124,14 @@ func Run(ctx context.Context, roomURL, keyHex string, socksPort int, duo bool, s idx := c.peerIdx.Add(1) % uint32(len(c.peers)) return c.peers[idx].Send(encrypted) }) - + c.mux.Reset() - + log.Println("Client multiplexer reset complete") }) log.Printf("Connecting peer %d to Telemost...", i) - if err := peer.Connect(ctx); err != nil { + if err := peer.Connect(runCtx); err != nil { return err } log.Printf("Peer %d connected", i) @@ -132,30 +139,30 @@ func Run(ctx context.Context, roomURL, keyHex string, socksPort int, duo bool, s c.wg.Add(1) go func() { defer c.wg.Done() - peer.WatchConnection(ctx) + peer.WatchConnection(runCtx) }() } time.Sleep(100 * time.Millisecond) - + resetFrame := make([]byte, 12) binary.BigEndian.PutUint32(resetFrame[0:4], c.clientID) binary.BigEndian.PutUint16(resetFrame[4:6], 0xFFFF) binary.BigEndian.PutUint16(resetFrame[6:8], 0xFFFF) binary.BigEndian.PutUint32(resetFrame[8:12], 0) encrypted, _ := cipher.Encrypt(resetFrame) - + for _, peer := range c.peers { peer.Send(encrypted) } log.Printf("Sent reset signal to server (clientID=%d)", c.clientID) - err = c.runSOCKS5(ctx, socksPort, socksUser, socksPass) - + err = c.runSOCKS5(runCtx, socksPort, socksUser, socksPass) + log.Println("Waiting for client goroutines...") c.wg.Wait() log.Println("Client goroutines finished") - + return err } diff --git a/internal/mux/mux.go b/internal/mux/mux.go index 5f25fb7..3b625cf 100644 --- a/internal/mux/mux.go +++ b/internal/mux/mux.go @@ -87,7 +87,8 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error { return nil } - const chunkSize = 7168 + // Keep encrypted DataChannel messages below Telemost's observed 8 KiB cap. + const chunkSize = 7000 totalChunks := (len(data) + chunkSize - 1) / chunkSize if totalChunks > 10 { diff --git a/internal/names/names.go b/internal/names/names.go index 43a3e02..02c37ff 100644 --- a/internal/names/names.go +++ b/internal/names/names.go @@ -76,11 +76,11 @@ func init() { } func LoadNameFiles(firstPath, lastPath string) error { - if names, err := loadNames(firstPath); err == nil { + if names, err := loadNames(firstPath); err == nil && len(names) > 0 { firstNames = names } - if names, err := loadNames(lastPath); err == nil { + if names, err := loadNames(lastPath); err == nil && len(names) > 0 { lastNames = names } @@ -88,6 +88,13 @@ func LoadNameFiles(firstPath, lastPath string) error { } func Generate() string { + if len(firstNames) == 0 { + firstNames = defaultFirstNames + } + if len(lastNames) == 0 { + lastNames = defaultLastNames + } + first := firstNames[rand.IntN(len(firstNames))] last := lastNames[rand.IntN(len(lastNames))] diff --git a/internal/names/names_test.go b/internal/names/names_test.go new file mode 100644 index 0000000..88c0ddc --- /dev/null +++ b/internal/names/names_test.go @@ -0,0 +1,19 @@ +package names + +import "testing" + +func TestGenerateFallsBackWhenListsAreEmpty(t *testing.T) { + oldFirst := firstNames + oldLast := lastNames + defer func() { + firstNames = oldFirst + lastNames = oldLast + }() + + firstNames = nil + lastNames = nil + + if got := Generate(); got == "" { + t.Fatal("Generate returned an empty display name") + } +} diff --git a/internal/server/server.go b/internal/server/server.go index 75efcc4..23725e2 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -13,12 +13,12 @@ import ( "sync/atomic" "time" - "github.com/pion/webrtc/v4" "github.com/openlibrecommunity/olcrtc/internal/crypto" "github.com/openlibrecommunity/olcrtc/internal/logger" "github.com/openlibrecommunity/olcrtc/internal/mux" "github.com/openlibrecommunity/olcrtc/internal/names" "github.com/openlibrecommunity/olcrtc/internal/telemost" + "github.com/pion/webrtc/v4" ) type Server struct { @@ -41,6 +41,9 @@ type ConnectRequest struct { } func Run(ctx context.Context, roomURL, keyHex string, duo bool, dnsServer string) error { + runCtx, cancel := context.WithCancel(ctx) + defer cancel() + var key []byte var err error @@ -76,11 +79,11 @@ func Run(ctx context.Context, roomURL, keyHex string, duo bool, dnsServer string peers: make([]*telemost.Peer, 0), dnsServer: dnsServer, } - + if dnsServer == "" { dnsServer = "1.1.1.1:53" } - + s.resolver = &net.Resolver{ PreferGo: true, Dial: func(ctx context.Context, network, address string) (net.Conn, error) { @@ -109,7 +112,7 @@ func Run(ctx context.Context, roomURL, keyHex string, duo bool, dnsServer string } time.Sleep(10 * time.Millisecond) } - + encrypted, err := s.cipher.Encrypt(frame) if err != nil { return err @@ -123,11 +126,15 @@ func Run(ctx context.Context, roomURL, keyHex string, duo bool, dnsServer string if err != nil { return err } + peer.SetEndedCallback(func(reason string) { + log.Printf("Server peer %d reported conference end: %s", i, reason) + cancel() + }) s.peers = append(s.peers, peer) peer.SetReconnectCallback(func(dc *webrtc.DataChannel) { log.Printf("Server peer %d reconnected - resetting multiplexer state", i) - + s.connMu.Lock() for sid, conn := range s.connections { if conn != nil { @@ -136,7 +143,7 @@ func Run(ctx context.Context, roomURL, keyHex string, duo bool, dnsServer string delete(s.connections, sid) } s.connMu.Unlock() - + if dc != nil { s.mux.UpdateSendFunc(func(frame []byte) error { encrypted, err := s.cipher.Encrypt(frame) @@ -147,14 +154,14 @@ func Run(ctx context.Context, roomURL, keyHex string, duo bool, dnsServer string return s.peers[idx].Send(encrypted) }) } - + s.mux.Reset() - + log.Println("Server multiplexer reset complete") }) log.Printf("Connecting peer %d to Telemost...", i) - if err := peer.Connect(ctx); err != nil { + if err := peer.Connect(runCtx); err != nil { return err } log.Printf("Peer %d connected", i) @@ -162,16 +169,16 @@ func Run(ctx context.Context, roomURL, keyHex string, duo bool, dnsServer string s.wg.Add(1) go func() { defer s.wg.Done() - peer.WatchConnection(ctx) + peer.WatchConnection(runCtx) }() } - err = s.run(ctx) - + err = s.run(runCtx) + log.Println("Waiting for server goroutines...") s.wg.Wait() log.Println("Server goroutines finished") - + return err } @@ -186,7 +193,7 @@ func (s *Server) onData(data []byte) { clientID := binary.BigEndian.Uint32(plaintext[0:4]) sid := binary.BigEndian.Uint16(plaintext[4:6]) length := binary.BigEndian.Uint16(plaintext[6:8]) - + if sid == 0xFFFF && length == 0xFFFF { log.Printf("Received reset signal from client (clientID=%d) - cleaning up", clientID) s.connMu.Lock() @@ -205,7 +212,7 @@ func (s *Server) onData(data []byte) { clientID := binary.BigEndian.Uint32(plaintext[0:4]) sid := binary.BigEndian.Uint16(plaintext[4:6]) length := binary.BigEndian.Uint16(plaintext[6:8]) - + if sid == 0xFFFF && length == 0xFFFF { log.Printf("Received reset signal from client (clientID=%d) - cleaning up", clientID) s.connMu.Lock() @@ -228,7 +235,7 @@ func (s *Server) onData(data []byte) { func (s *Server) run(ctx context.Context) error { ticker := time.NewTicker(10 * time.Millisecond) defer ticker.Stop() - + for { select { case <-ctx.Done(): @@ -240,21 +247,21 @@ func (s *Server) run(ctx context.Context) error { } } s.connMu.Unlock() - + log.Printf("Closing %d peer(s)...", len(s.peers)) for i, peer := range s.peers { log.Printf("Closing peer %d...", i) peer.Close() } log.Println("All peers closed") - + return nil - + case <-ticker.C: } - + sids := s.mux.GetStreams() - + for _, sid := range sids { go func(sid uint16) { data := s.mux.ReadStream(sid) @@ -262,7 +269,7 @@ func (s *Server) run(ctx context.Context) error { s.connMu.RLock() conn, exists := s.connections[sid] s.connMu.RUnlock() - + if exists && conn != nil { if _, err := conn.Write(data); err != nil { s.mux.CloseStream(sid) @@ -315,28 +322,28 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) { s.connMu.Unlock() dialStart := time.Now() - + dialer := &net.Dialer{ Timeout: 10 * time.Second, KeepAlive: 30 * time.Second, Resolver: s.resolver, } - + conn, err := dialer.Dial("tcp4", addr) dialElapsed := time.Since(dialStart) - + if err != nil { log.Printf("[SERVER] sid=%d CONNECT_FAILED dial_time=%v total_elapsed=%v err=%v", sid, dialElapsed, time.Since(startTime), err) go s.mux.CloseStream(sid) return } - + logger.Verbose("TCP dial took %v for sid=%d", dialElapsed, sid) - + s.connMu.Lock() s.connections[sid] = conn s.connMu.Unlock() - + log.Printf("[SERVER] sid=%d CONNECT_SUCCESS dial_time=%v", sid, dialElapsed) s.mux.SendData(sid, []byte{0x00}) @@ -348,11 +355,11 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) { delete(s.connections, sid) s.connMu.Unlock() }() - + buf := make([]byte, 16384) totalSent := uint64(0) lastLog := time.Now() - + for { n, err := conn.Read(buf) if err != nil { @@ -361,7 +368,7 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) { } return } - + for !s.canSendData() { time.Sleep(20 * time.Millisecond) } @@ -369,7 +376,7 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) { if err := s.mux.SendData(sid, buf[:n]); err != nil { return } - + totalSent += uint64(n) if time.Since(lastLog) > 5*time.Second { log.Printf("[SERVER] sid=%d TRANSFER_PROGRESS sent=%d MB", sid, totalSent/(1024*1024)) diff --git a/internal/telemost/peer.go b/internal/telemost/peer.go index 633e003..667732d 100644 --- a/internal/telemost/peer.go +++ b/internal/telemost/peer.go @@ -1,9 +1,13 @@ package telemost import ( + "bytes" "context" + "encoding/json" "fmt" "log" + "math/rand/v2" + "net/http" "strings" "sync" "sync/atomic" @@ -16,6 +20,19 @@ import ( "github.com/pion/webrtc/v4" ) +const ( + realDataChannelMessageLimit = 8192 + defaultSendDelayMin = 2 * time.Millisecond + defaultSendDelayMax = 12 * time.Millisecond + defaultTelemetryInterval = 20 * time.Second +) + +type TrafficShape struct { + MaxMessageSize int + MinDelay time.Duration + MaxDelay time.Duration +} + type Peer struct { roomURL string name string @@ -30,11 +47,18 @@ type Peer struct { reconnectCh chan struct{} closeCh chan struct{} keepAliveCh chan struct{} + telemetryCh chan struct{} lastReconnect time.Time reconnectCount int reconnectMu sync.Mutex sendQueue chan []byte sendQueueClosed atomic.Bool + closed atomic.Bool + telemetryActive atomic.Bool + ackMu sync.Mutex + ackWaiters map[string]chan struct{} + onEnded func(string) + trafficShape TrafficShape wg sync.WaitGroup } @@ -49,6 +73,20 @@ func (p *Peer) GetBufferedAmount() uint64 { return 0 } +func (p *Peer) SetEndedCallback(cb func(string)) { + p.onEnded = cb +} + +func (p *Peer) SetTrafficShape(shape TrafficShape) { + if shape.MaxMessageSize <= 0 { + shape.MaxMessageSize = realDataChannelMessageLimit + } + if shape.MaxDelay < shape.MinDelay { + shape.MaxDelay = shape.MinDelay + } + p.trafficShape = shape +} + func NewPeer(roomURL, name string, onData func([]byte)) (*Peer, error) { conn, err := GetConnectionInfo(roomURL, name) if err != nil { @@ -63,11 +101,20 @@ func NewPeer(roomURL, name string, onData func([]byte)) (*Peer, error) { reconnectCh: make(chan struct{}, 1), closeCh: make(chan struct{}), keepAliveCh: make(chan struct{}), + telemetryCh: make(chan struct{}, 1), sendQueue: make(chan []byte, 5000), + ackWaiters: make(map[string]chan struct{}), + trafficShape: TrafficShape{ + MaxMessageSize: realDataChannelMessageLimit, + MinDelay: defaultSendDelayMin, + MaxDelay: defaultSendDelayMax, + }, }, nil } func (p *Peer) Connect(ctx context.Context) error { + p.closed.Store(false) + config := webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ {URLs: []string{"stun:stun.rtc.yandex.net:3478"}}, @@ -89,7 +136,7 @@ func (p *Peer) Connect(ctx context.Context) error { p.pcSub.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { log.Printf("Subscriber PeerConnection state: %s", state.String()) - if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected { + if !p.closed.Load() && (state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected) { select { case p.reconnectCh <- struct{}{}: default: @@ -104,7 +151,7 @@ func (p *Peer) Connect(ctx context.Context) error { p.pcPub.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { log.Printf("Publisher PeerConnection state: %s", state.String()) - if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected { + if !p.closed.Load() && (state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected) { select { case p.reconnectCh <- struct{}{}: default: @@ -145,9 +192,11 @@ func (p *Peer) Connect(ctx context.Context) error { log.Println("Calling reconnect callback for cleanup") p.onReconnect(nil) } - select { - case p.reconnectCh <- struct{}{}: - default: + if !p.closed.Load() { + select { + case p.reconnectCh <- struct{}{}: + default: + } } }) @@ -161,9 +210,11 @@ func (p *Peer) Connect(ctx context.Context) error { log.Printf("Received datachannel: %s", dc.Label()) dc.OnClose(func() { log.Println("Received DataChannel closed - triggering reconnect") - select { - case p.reconnectCh <- struct{}{}: - default: + if !p.closed.Load() { + select { + case p.reconnectCh <- struct{}{}: + default: + } } }) dc.OnMessage(func(msg webrtc.DataChannelMessage) { @@ -289,9 +340,11 @@ func (p *Peer) handleSignaling() { var msg map[string]interface{} if err := p.ws.ReadJSON(&msg); err != nil { log.Printf("WS read error: %v", err) - select { - case p.reconnectCh <- struct{}{}: - default: + if !p.closed.Load() { + select { + case p.reconnectCh <- struct{}{}: + default: + } } return } @@ -304,7 +357,12 @@ func (p *Peer) handleSignaling() { uid, _ := msg["uid"].(string) - if _, ok := msg["serverHello"]; ok { + if _, ok := msg["ack"]; ok { + p.resolveAck(uid) + } + + if serverHello, ok := msg["serverHello"].(map[string]interface{}); ok { + p.startTelemetry(serverHello) p.sendAck(uid) } @@ -316,6 +374,11 @@ func (p *Peer) handleSignaling() { p.sendAck(uid) } + if isConferenceEndMessage(msg) { + p.signalEnded("conference ended") + return + } + if _, ok := msg["ping"]; ok { p.sendPong(uid) continue @@ -430,6 +493,10 @@ func (p *Peer) handleICE(cand map[string]interface{}) { } func (p *Peer) sendAck(uid string) { + if uid == "" { + return + } + p.wsMu.Lock() defer p.wsMu.Unlock() @@ -443,6 +510,53 @@ func (p *Peer) sendAck(uid string) { }) } +func (p *Peer) registerAckWaiter(uid string) chan struct{} { + ch := make(chan struct{}) + p.ackMu.Lock() + p.ackWaiters[uid] = ch + p.ackMu.Unlock() + return ch +} + +func (p *Peer) removeAckWaiter(uid string) { + p.ackMu.Lock() + delete(p.ackWaiters, uid) + p.ackMu.Unlock() +} + +func (p *Peer) waitForAck(uid string, ch <-chan struct{}, timeout time.Duration) bool { + if uid == "" { + return false + } + + defer func() { + p.removeAckWaiter(uid) + }() + + select { + case <-ch: + return true + case <-time.After(timeout): + return false + case <-p.closeCh: + return false + } +} + +func (p *Peer) resolveAck(uid string) { + if uid == "" { + return + } + + p.ackMu.Lock() + ch := p.ackWaiters[uid] + if ch != nil { + delete(p.ackWaiters, uid) + close(ch) + } + p.ackMu.Unlock() +} + func (p *Peer) sendPong(uid string) { p.wsMu.Lock() defer p.wsMu.Unlock() @@ -453,6 +567,149 @@ func (p *Peer) sendPong(uid string) { }) } +func (p *Peer) startTelemetry(serverHello map[string]interface{}) { + cfg, ok := serverHello["telemetryConfiguration"].(map[string]interface{}) + if !ok { + return + } + + endpoint, _ := cfg["logEndpoint"].(string) + if endpoint == "" { + endpoint, _ = cfg["endpoint"].(string) + } + if endpoint == "" { + endpoint, _ = cfg["url"].(string) + } + if endpoint == "" { + logger.Verbose("Telemetry configuration has no endpoint; skipping XHR simulation") + return + } + + interval := defaultTelemetryInterval + if raw, ok := cfg["sendingInterval"].(float64); ok && raw > 0 { + interval = time.Duration(raw) * time.Millisecond + } + + if !p.telemetryActive.CompareAndSwap(false, true) { + return + } + + p.wg.Add(1) + go func() { + defer p.wg.Done() + defer p.telemetryActive.Store(false) + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + p.sendTelemetry(endpoint, "join") + for { + select { + case <-ticker.C: + p.sendTelemetry(endpoint, "stats") + case <-p.telemetryCh: + p.sendTelemetry(endpoint, "leave") + return + case <-p.closeCh: + p.sendTelemetry(endpoint, "leave") + return + } + } + }() +} + +func (p *Peer) stopTelemetry() { + if p.telemetryActive.Load() { + select { + case p.telemetryCh <- struct{}{}: + default: + } + } +} + +func (p *Peer) sendTelemetry(endpoint, event string) { + body, err := json.Marshal(map[string]interface{}{ + "event": event, + "timestamp": time.Now().UnixMilli(), + "peerId": p.conn.PeerID, + "roomId": p.conn.RoomID, + "displayName": p.name, + "implementation": "olcrtc-go", + "dataChannel": map[string]interface{}{ + "bufferedAmount": p.GetBufferedAmount(), + "sendQueue": len(p.sendQueue), + }, + }) + if err != nil { + return + } + + req, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewReader(body)) + if err != nil { + logger.Verbose("Telemetry request skipped: %v", err) + return + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0") + req.Header.Set("Origin", "https://telemost.yandex.ru") + req.Header.Set("Referer", p.roomURL) + req.Header.Set("X-Requested-With", "XMLHttpRequest") + req.Header.Set("Client-Instance-Id", uuid.New().String()) + req.Header.Set("X-Telemost-Client-Version", "187.1.0") + req.Header.Set("Idempotency-Key", uuid.New().String()) + + client := protect.NewHTTPClient() + resp, err := client.Do(req) + if err != nil { + logger.Verbose("Telemetry send failed: %v", err) + return + } + defer resp.Body.Close() + if resp.StatusCode >= 400 { + logger.Verbose("Telemetry endpoint returned %s", resp.Status) + } +} + +func (p *Peer) signalEnded(reason string) { + log.Printf("Conference ended: %s", reason) + p.closed.Store(true) + p.stopTelemetry() + if p.onEnded != nil { + p.onEnded(reason) + } +} + +func isConferenceEndMessage(msg map[string]interface{}) bool { + for _, key := range []string{"conferenceClosed", "conferenceEnded", "roomClosed", "roomEnded", "callEnded"} { + if _, ok := msg[key]; ok { + return true + } + } + + if raw, ok := msg["conference"].(map[string]interface{}); ok { + if state, _ := raw["state"].(string); isEndedState(state) { + return true + } + } + + if raw, ok := msg["conferenceState"].(map[string]interface{}); ok { + if state, _ := raw["state"].(string); isEndedState(state) { + return true + } + } + + return false +} + +func isEndedState(state string) bool { + switch strings.ToLower(state) { + case "closed", "ended", "finished", "terminated": + return true + default: + return false + } +} + func (p *Peer) setupICEHandlers() { p.pcSub.OnICECandidate(func(c *webrtc.ICECandidate) { if c == nil { @@ -495,36 +752,51 @@ func (p *Peer) setupICEHandlers() { }) } -func (p *Peer) sendLeave() { +func (p *Peer) sendLeave(uid string) bool { p.wsMu.Lock() defer p.wsMu.Unlock() if p.ws == nil { log.Println("WebSocket already closed, cannot send leave") - return + return false } leave := map[string]interface{}{ - "uid": uuid.New().String(), + "uid": uid, "leave": map[string]interface{}{}, } if err := p.ws.WriteJSON(leave); err != nil { log.Printf("Failed to send leave: %v", err) + return false } else { log.Println("Sent leave message to server") } + return true } func (p *Peer) Close() error { log.Println("Closing peer connection...") + alreadyClosing := p.closed.Swap(true) p.sendQueueClosed.Store(true) - log.Println("Sending leave message...") - p.sendLeave() + if !alreadyClosing { + log.Println("Sending leave message...") + leaveUID := uuid.New().String() + leaveAck := p.registerAckWaiter(leaveUID) + if p.sendLeave(leaveUID) { + if p.waitForAck(leaveUID, leaveAck, 1500*time.Millisecond) { + log.Println("Leave acknowledged") + } else { + log.Println("Leave ack timeout") + } + } else { + p.removeAckWaiter(leaveUID) + } - time.Sleep(1 * time.Second) + p.stopTelemetry() + } log.Println("Closing channels...") if p.closeCh != nil { @@ -627,7 +899,7 @@ func (p *Peer) keepAlive() { func (p *Peer) reconnect(ctx context.Context) error { log.Println("Reconnecting...") - p.sendLeave() + p.sendLeave(uuid.New().String()) time.Sleep(500 * time.Millisecond) close(p.keepAliveCh) @@ -727,10 +999,20 @@ func (p *Peer) processSendQueue(workerID int) { for { select { - case data := <-p.sendQueue: + case data, ok := <-p.sendQueue: + if !ok { + return + } if p.dc == nil || p.dc.ReadyState() != webrtc.DataChannelStateOpen { continue } + if p.trafficShape.MaxMessageSize > 0 && len(data) > p.trafficShape.MaxMessageSize { + log.Printf("[WORKER-%d] Refusing oversized DataChannel message size=%d limit=%d", workerID, len(data), p.trafficShape.MaxMessageSize) + continue + } + if delay := p.nextSendDelay(); delay > 0 { + time.Sleep(delay) + } // Wait until SCTP buffer drains. Dropping here would corrupt the // carried TCP streams (the mux is a reliable transport); large @@ -804,3 +1086,15 @@ func (p *Peer) CanSend() bool { } return queueLen < 1000 && buffered < 3*1024*1024 } + +func (p *Peer) nextSendDelay() time.Duration { + minDelay := p.trafficShape.MinDelay + maxDelay := p.trafficShape.MaxDelay + if maxDelay <= 0 { + return 0 + } + if maxDelay <= minDelay { + return maxDelay + } + return minDelay + time.Duration(rand.Int64N(int64(maxDelay-minDelay))) +} diff --git a/internal/telemost/peer_test.go b/internal/telemost/peer_test.go new file mode 100644 index 0000000..34e935c --- /dev/null +++ b/internal/telemost/peer_test.go @@ -0,0 +1,26 @@ +package telemost + +import "testing" + +func TestIsConferenceEndMessage(t *testing.T) { + tests := []map[string]interface{}{ + {"conferenceEnded": map[string]interface{}{}}, + {"conference": map[string]interface{}{"state": "closed"}}, + {"conferenceState": map[string]interface{}{"state": "TERMINATED"}}, + } + + for _, tt := range tests { + if !isConferenceEndMessage(tt) { + t.Fatalf("expected end message for %#v", tt) + } + } +} + +func TestIsConferenceEndMessageIgnoresActiveState(t *testing.T) { + msg := map[string]interface{}{ + "conference": map[string]interface{}{"state": "active"}, + } + if isConferenceEndMessage(msg) { + t.Fatal("active conference state must not be treated as ended") + } +}