Improve Telemost session behavior

This commit is contained in:
Qtozdec
2026-04-10 16:03:42 +03:00
parent 1adff53af2
commit ca0191d0de
7 changed files with 429 additions and 68 deletions

View File

@@ -14,12 +14,12 @@ import (
"sync/atomic"
"time"
"github.com/pion/webrtc/v4"
"github.com/openlibrecommunity/olcrtc/internal/crypto"
"github.com/openlibrecommunity/olcrtc/internal/logger"
"github.com/openlibrecommunity/olcrtc/internal/mux"
"github.com/openlibrecommunity/olcrtc/internal/names"
"github.com/openlibrecommunity/olcrtc/internal/telemost"
"github.com/pion/webrtc/v4"
)
type Client struct {
@@ -32,6 +32,9 @@ type Client struct {
}
func Run(ctx context.Context, roomURL, keyHex string, socksPort int, duo bool, socksUser, socksPass string) error {
runCtx, cancel := context.WithCancel(ctx)
defer cancel()
var key []byte
var err error
@@ -89,12 +92,12 @@ func Run(ctx context.Context, roomURL, keyHex string, socksPort int, duo bool, s
}
time.Sleep(10 * time.Millisecond)
}
encrypted, err := c.cipher.Encrypt(frame)
if err != nil {
return err
}
idx := c.peerIdx.Add(1) % uint32(len(c.peers))
return c.peers[idx].Send(encrypted)
})
@@ -104,11 +107,15 @@ func Run(ctx context.Context, roomURL, keyHex string, socksPort int, duo bool, s
if err != nil {
return err
}
peer.SetEndedCallback(func(reason string) {
log.Printf("Client peer %d reported conference end: %s", i, reason)
cancel()
})
c.peers = append(c.peers, peer)
peer.SetReconnectCallback(func(dc *webrtc.DataChannel) {
log.Printf("Client peer %d reconnected - resetting multiplexer state", i)
c.mux.UpdateSendFunc(func(frame []byte) error {
encrypted, err := c.cipher.Encrypt(frame)
if err != nil {
@@ -117,14 +124,14 @@ func Run(ctx context.Context, roomURL, keyHex string, socksPort int, duo bool, s
idx := c.peerIdx.Add(1) % uint32(len(c.peers))
return c.peers[idx].Send(encrypted)
})
c.mux.Reset()
log.Println("Client multiplexer reset complete")
})
log.Printf("Connecting peer %d to Telemost...", i)
if err := peer.Connect(ctx); err != nil {
if err := peer.Connect(runCtx); err != nil {
return err
}
log.Printf("Peer %d connected", i)
@@ -132,30 +139,30 @@ func Run(ctx context.Context, roomURL, keyHex string, socksPort int, duo bool, s
c.wg.Add(1)
go func() {
defer c.wg.Done()
peer.WatchConnection(ctx)
peer.WatchConnection(runCtx)
}()
}
time.Sleep(100 * time.Millisecond)
resetFrame := make([]byte, 12)
binary.BigEndian.PutUint32(resetFrame[0:4], c.clientID)
binary.BigEndian.PutUint16(resetFrame[4:6], 0xFFFF)
binary.BigEndian.PutUint16(resetFrame[6:8], 0xFFFF)
binary.BigEndian.PutUint32(resetFrame[8:12], 0)
encrypted, _ := cipher.Encrypt(resetFrame)
for _, peer := range c.peers {
peer.Send(encrypted)
}
log.Printf("Sent reset signal to server (clientID=%d)", c.clientID)
err = c.runSOCKS5(ctx, socksPort, socksUser, socksPass)
err = c.runSOCKS5(runCtx, socksPort, socksUser, socksPass)
log.Println("Waiting for client goroutines...")
c.wg.Wait()
log.Println("Client goroutines finished")
return err
}

View File

@@ -87,7 +87,8 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error {
return nil
}
const chunkSize = 7168
// Keep encrypted DataChannel messages below Telemost's observed 8 KiB cap.
const chunkSize = 7000
totalChunks := (len(data) + chunkSize - 1) / chunkSize
if totalChunks > 10 {

View File

@@ -76,11 +76,11 @@ func init() {
}
func LoadNameFiles(firstPath, lastPath string) error {
if names, err := loadNames(firstPath); err == nil {
if names, err := loadNames(firstPath); err == nil && len(names) > 0 {
firstNames = names
}
if names, err := loadNames(lastPath); err == nil {
if names, err := loadNames(lastPath); err == nil && len(names) > 0 {
lastNames = names
}
@@ -88,6 +88,13 @@ func LoadNameFiles(firstPath, lastPath string) error {
}
func Generate() string {
if len(firstNames) == 0 {
firstNames = defaultFirstNames
}
if len(lastNames) == 0 {
lastNames = defaultLastNames
}
first := firstNames[rand.IntN(len(firstNames))]
last := lastNames[rand.IntN(len(lastNames))]

View File

@@ -0,0 +1,19 @@
package names
import "testing"
func TestGenerateFallsBackWhenListsAreEmpty(t *testing.T) {
oldFirst := firstNames
oldLast := lastNames
defer func() {
firstNames = oldFirst
lastNames = oldLast
}()
firstNames = nil
lastNames = nil
if got := Generate(); got == "" {
t.Fatal("Generate returned an empty display name")
}
}

View File

@@ -13,12 +13,12 @@ import (
"sync/atomic"
"time"
"github.com/pion/webrtc/v4"
"github.com/openlibrecommunity/olcrtc/internal/crypto"
"github.com/openlibrecommunity/olcrtc/internal/logger"
"github.com/openlibrecommunity/olcrtc/internal/mux"
"github.com/openlibrecommunity/olcrtc/internal/names"
"github.com/openlibrecommunity/olcrtc/internal/telemost"
"github.com/pion/webrtc/v4"
)
type Server struct {
@@ -41,6 +41,9 @@ type ConnectRequest struct {
}
func Run(ctx context.Context, roomURL, keyHex string, duo bool, dnsServer string) error {
runCtx, cancel := context.WithCancel(ctx)
defer cancel()
var key []byte
var err error
@@ -76,11 +79,11 @@ func Run(ctx context.Context, roomURL, keyHex string, duo bool, dnsServer string
peers: make([]*telemost.Peer, 0),
dnsServer: dnsServer,
}
if dnsServer == "" {
dnsServer = "1.1.1.1:53"
}
s.resolver = &net.Resolver{
PreferGo: true,
Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
@@ -109,7 +112,7 @@ func Run(ctx context.Context, roomURL, keyHex string, duo bool, dnsServer string
}
time.Sleep(10 * time.Millisecond)
}
encrypted, err := s.cipher.Encrypt(frame)
if err != nil {
return err
@@ -123,11 +126,15 @@ func Run(ctx context.Context, roomURL, keyHex string, duo bool, dnsServer string
if err != nil {
return err
}
peer.SetEndedCallback(func(reason string) {
log.Printf("Server peer %d reported conference end: %s", i, reason)
cancel()
})
s.peers = append(s.peers, peer)
peer.SetReconnectCallback(func(dc *webrtc.DataChannel) {
log.Printf("Server peer %d reconnected - resetting multiplexer state", i)
s.connMu.Lock()
for sid, conn := range s.connections {
if conn != nil {
@@ -136,7 +143,7 @@ func Run(ctx context.Context, roomURL, keyHex string, duo bool, dnsServer string
delete(s.connections, sid)
}
s.connMu.Unlock()
if dc != nil {
s.mux.UpdateSendFunc(func(frame []byte) error {
encrypted, err := s.cipher.Encrypt(frame)
@@ -147,14 +154,14 @@ func Run(ctx context.Context, roomURL, keyHex string, duo bool, dnsServer string
return s.peers[idx].Send(encrypted)
})
}
s.mux.Reset()
log.Println("Server multiplexer reset complete")
})
log.Printf("Connecting peer %d to Telemost...", i)
if err := peer.Connect(ctx); err != nil {
if err := peer.Connect(runCtx); err != nil {
return err
}
log.Printf("Peer %d connected", i)
@@ -162,16 +169,16 @@ func Run(ctx context.Context, roomURL, keyHex string, duo bool, dnsServer string
s.wg.Add(1)
go func() {
defer s.wg.Done()
peer.WatchConnection(ctx)
peer.WatchConnection(runCtx)
}()
}
err = s.run(ctx)
err = s.run(runCtx)
log.Println("Waiting for server goroutines...")
s.wg.Wait()
log.Println("Server goroutines finished")
return err
}
@@ -186,7 +193,7 @@ func (s *Server) onData(data []byte) {
clientID := binary.BigEndian.Uint32(plaintext[0:4])
sid := binary.BigEndian.Uint16(plaintext[4:6])
length := binary.BigEndian.Uint16(plaintext[6:8])
if sid == 0xFFFF && length == 0xFFFF {
log.Printf("Received reset signal from client (clientID=%d) - cleaning up", clientID)
s.connMu.Lock()
@@ -205,7 +212,7 @@ func (s *Server) onData(data []byte) {
clientID := binary.BigEndian.Uint32(plaintext[0:4])
sid := binary.BigEndian.Uint16(plaintext[4:6])
length := binary.BigEndian.Uint16(plaintext[6:8])
if sid == 0xFFFF && length == 0xFFFF {
log.Printf("Received reset signal from client (clientID=%d) - cleaning up", clientID)
s.connMu.Lock()
@@ -228,7 +235,7 @@ func (s *Server) onData(data []byte) {
func (s *Server) run(ctx context.Context) error {
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
@@ -240,21 +247,21 @@ func (s *Server) run(ctx context.Context) error {
}
}
s.connMu.Unlock()
log.Printf("Closing %d peer(s)...", len(s.peers))
for i, peer := range s.peers {
log.Printf("Closing peer %d...", i)
peer.Close()
}
log.Println("All peers closed")
return nil
case <-ticker.C:
}
sids := s.mux.GetStreams()
for _, sid := range sids {
go func(sid uint16) {
data := s.mux.ReadStream(sid)
@@ -262,7 +269,7 @@ func (s *Server) run(ctx context.Context) error {
s.connMu.RLock()
conn, exists := s.connections[sid]
s.connMu.RUnlock()
if exists && conn != nil {
if _, err := conn.Write(data); err != nil {
s.mux.CloseStream(sid)
@@ -315,28 +322,28 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) {
s.connMu.Unlock()
dialStart := time.Now()
dialer := &net.Dialer{
Timeout: 10 * time.Second,
KeepAlive: 30 * time.Second,
Resolver: s.resolver,
}
conn, err := dialer.Dial("tcp4", addr)
dialElapsed := time.Since(dialStart)
if err != nil {
log.Printf("[SERVER] sid=%d CONNECT_FAILED dial_time=%v total_elapsed=%v err=%v", sid, dialElapsed, time.Since(startTime), err)
go s.mux.CloseStream(sid)
return
}
logger.Verbose("TCP dial took %v for sid=%d", dialElapsed, sid)
s.connMu.Lock()
s.connections[sid] = conn
s.connMu.Unlock()
log.Printf("[SERVER] sid=%d CONNECT_SUCCESS dial_time=%v", sid, dialElapsed)
s.mux.SendData(sid, []byte{0x00})
@@ -348,11 +355,11 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) {
delete(s.connections, sid)
s.connMu.Unlock()
}()
buf := make([]byte, 16384)
totalSent := uint64(0)
lastLog := time.Now()
for {
n, err := conn.Read(buf)
if err != nil {
@@ -361,7 +368,7 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) {
}
return
}
for !s.canSendData() {
time.Sleep(20 * time.Millisecond)
}
@@ -369,7 +376,7 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) {
if err := s.mux.SendData(sid, buf[:n]); err != nil {
return
}
totalSent += uint64(n)
if time.Since(lastLog) > 5*time.Second {
log.Printf("[SERVER] sid=%d TRANSFER_PROGRESS sent=%d MB", sid, totalSent/(1024*1024))

View File

@@ -1,9 +1,13 @@
package telemost
import (
"bytes"
"context"
"encoding/json"
"fmt"
"log"
"math/rand/v2"
"net/http"
"strings"
"sync"
"sync/atomic"
@@ -16,6 +20,19 @@ import (
"github.com/pion/webrtc/v4"
)
const (
realDataChannelMessageLimit = 8192
defaultSendDelayMin = 2 * time.Millisecond
defaultSendDelayMax = 12 * time.Millisecond
defaultTelemetryInterval = 20 * time.Second
)
type TrafficShape struct {
MaxMessageSize int
MinDelay time.Duration
MaxDelay time.Duration
}
type Peer struct {
roomURL string
name string
@@ -30,11 +47,18 @@ type Peer struct {
reconnectCh chan struct{}
closeCh chan struct{}
keepAliveCh chan struct{}
telemetryCh chan struct{}
lastReconnect time.Time
reconnectCount int
reconnectMu sync.Mutex
sendQueue chan []byte
sendQueueClosed atomic.Bool
closed atomic.Bool
telemetryActive atomic.Bool
ackMu sync.Mutex
ackWaiters map[string]chan struct{}
onEnded func(string)
trafficShape TrafficShape
wg sync.WaitGroup
}
@@ -49,6 +73,20 @@ func (p *Peer) GetBufferedAmount() uint64 {
return 0
}
func (p *Peer) SetEndedCallback(cb func(string)) {
p.onEnded = cb
}
func (p *Peer) SetTrafficShape(shape TrafficShape) {
if shape.MaxMessageSize <= 0 {
shape.MaxMessageSize = realDataChannelMessageLimit
}
if shape.MaxDelay < shape.MinDelay {
shape.MaxDelay = shape.MinDelay
}
p.trafficShape = shape
}
func NewPeer(roomURL, name string, onData func([]byte)) (*Peer, error) {
conn, err := GetConnectionInfo(roomURL, name)
if err != nil {
@@ -63,11 +101,20 @@ func NewPeer(roomURL, name string, onData func([]byte)) (*Peer, error) {
reconnectCh: make(chan struct{}, 1),
closeCh: make(chan struct{}),
keepAliveCh: make(chan struct{}),
telemetryCh: make(chan struct{}, 1),
sendQueue: make(chan []byte, 5000),
ackWaiters: make(map[string]chan struct{}),
trafficShape: TrafficShape{
MaxMessageSize: realDataChannelMessageLimit,
MinDelay: defaultSendDelayMin,
MaxDelay: defaultSendDelayMax,
},
}, nil
}
func (p *Peer) Connect(ctx context.Context) error {
p.closed.Store(false)
config := webrtc.Configuration{
ICEServers: []webrtc.ICEServer{
{URLs: []string{"stun:stun.rtc.yandex.net:3478"}},
@@ -89,7 +136,7 @@ func (p *Peer) Connect(ctx context.Context) error {
p.pcSub.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
log.Printf("Subscriber PeerConnection state: %s", state.String())
if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected {
if !p.closed.Load() && (state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected) {
select {
case p.reconnectCh <- struct{}{}:
default:
@@ -104,7 +151,7 @@ func (p *Peer) Connect(ctx context.Context) error {
p.pcPub.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
log.Printf("Publisher PeerConnection state: %s", state.String())
if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected {
if !p.closed.Load() && (state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected) {
select {
case p.reconnectCh <- struct{}{}:
default:
@@ -145,9 +192,11 @@ func (p *Peer) Connect(ctx context.Context) error {
log.Println("Calling reconnect callback for cleanup")
p.onReconnect(nil)
}
select {
case p.reconnectCh <- struct{}{}:
default:
if !p.closed.Load() {
select {
case p.reconnectCh <- struct{}{}:
default:
}
}
})
@@ -161,9 +210,11 @@ func (p *Peer) Connect(ctx context.Context) error {
log.Printf("Received datachannel: %s", dc.Label())
dc.OnClose(func() {
log.Println("Received DataChannel closed - triggering reconnect")
select {
case p.reconnectCh <- struct{}{}:
default:
if !p.closed.Load() {
select {
case p.reconnectCh <- struct{}{}:
default:
}
}
})
dc.OnMessage(func(msg webrtc.DataChannelMessage) {
@@ -289,9 +340,11 @@ func (p *Peer) handleSignaling() {
var msg map[string]interface{}
if err := p.ws.ReadJSON(&msg); err != nil {
log.Printf("WS read error: %v", err)
select {
case p.reconnectCh <- struct{}{}:
default:
if !p.closed.Load() {
select {
case p.reconnectCh <- struct{}{}:
default:
}
}
return
}
@@ -304,7 +357,12 @@ func (p *Peer) handleSignaling() {
uid, _ := msg["uid"].(string)
if _, ok := msg["serverHello"]; ok {
if _, ok := msg["ack"]; ok {
p.resolveAck(uid)
}
if serverHello, ok := msg["serverHello"].(map[string]interface{}); ok {
p.startTelemetry(serverHello)
p.sendAck(uid)
}
@@ -316,6 +374,11 @@ func (p *Peer) handleSignaling() {
p.sendAck(uid)
}
if isConferenceEndMessage(msg) {
p.signalEnded("conference ended")
return
}
if _, ok := msg["ping"]; ok {
p.sendPong(uid)
continue
@@ -430,6 +493,10 @@ func (p *Peer) handleICE(cand map[string]interface{}) {
}
func (p *Peer) sendAck(uid string) {
if uid == "" {
return
}
p.wsMu.Lock()
defer p.wsMu.Unlock()
@@ -443,6 +510,53 @@ func (p *Peer) sendAck(uid string) {
})
}
func (p *Peer) registerAckWaiter(uid string) chan struct{} {
ch := make(chan struct{})
p.ackMu.Lock()
p.ackWaiters[uid] = ch
p.ackMu.Unlock()
return ch
}
func (p *Peer) removeAckWaiter(uid string) {
p.ackMu.Lock()
delete(p.ackWaiters, uid)
p.ackMu.Unlock()
}
func (p *Peer) waitForAck(uid string, ch <-chan struct{}, timeout time.Duration) bool {
if uid == "" {
return false
}
defer func() {
p.removeAckWaiter(uid)
}()
select {
case <-ch:
return true
case <-time.After(timeout):
return false
case <-p.closeCh:
return false
}
}
func (p *Peer) resolveAck(uid string) {
if uid == "" {
return
}
p.ackMu.Lock()
ch := p.ackWaiters[uid]
if ch != nil {
delete(p.ackWaiters, uid)
close(ch)
}
p.ackMu.Unlock()
}
func (p *Peer) sendPong(uid string) {
p.wsMu.Lock()
defer p.wsMu.Unlock()
@@ -453,6 +567,149 @@ func (p *Peer) sendPong(uid string) {
})
}
func (p *Peer) startTelemetry(serverHello map[string]interface{}) {
cfg, ok := serverHello["telemetryConfiguration"].(map[string]interface{})
if !ok {
return
}
endpoint, _ := cfg["logEndpoint"].(string)
if endpoint == "" {
endpoint, _ = cfg["endpoint"].(string)
}
if endpoint == "" {
endpoint, _ = cfg["url"].(string)
}
if endpoint == "" {
logger.Verbose("Telemetry configuration has no endpoint; skipping XHR simulation")
return
}
interval := defaultTelemetryInterval
if raw, ok := cfg["sendingInterval"].(float64); ok && raw > 0 {
interval = time.Duration(raw) * time.Millisecond
}
if !p.telemetryActive.CompareAndSwap(false, true) {
return
}
p.wg.Add(1)
go func() {
defer p.wg.Done()
defer p.telemetryActive.Store(false)
ticker := time.NewTicker(interval)
defer ticker.Stop()
p.sendTelemetry(endpoint, "join")
for {
select {
case <-ticker.C:
p.sendTelemetry(endpoint, "stats")
case <-p.telemetryCh:
p.sendTelemetry(endpoint, "leave")
return
case <-p.closeCh:
p.sendTelemetry(endpoint, "leave")
return
}
}
}()
}
func (p *Peer) stopTelemetry() {
if p.telemetryActive.Load() {
select {
case p.telemetryCh <- struct{}{}:
default:
}
}
}
func (p *Peer) sendTelemetry(endpoint, event string) {
body, err := json.Marshal(map[string]interface{}{
"event": event,
"timestamp": time.Now().UnixMilli(),
"peerId": p.conn.PeerID,
"roomId": p.conn.RoomID,
"displayName": p.name,
"implementation": "olcrtc-go",
"dataChannel": map[string]interface{}{
"bufferedAmount": p.GetBufferedAmount(),
"sendQueue": len(p.sendQueue),
},
})
if err != nil {
return
}
req, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewReader(body))
if err != nil {
logger.Verbose("Telemetry request skipped: %v", err)
return
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0")
req.Header.Set("Origin", "https://telemost.yandex.ru")
req.Header.Set("Referer", p.roomURL)
req.Header.Set("X-Requested-With", "XMLHttpRequest")
req.Header.Set("Client-Instance-Id", uuid.New().String())
req.Header.Set("X-Telemost-Client-Version", "187.1.0")
req.Header.Set("Idempotency-Key", uuid.New().String())
client := protect.NewHTTPClient()
resp, err := client.Do(req)
if err != nil {
logger.Verbose("Telemetry send failed: %v", err)
return
}
defer resp.Body.Close()
if resp.StatusCode >= 400 {
logger.Verbose("Telemetry endpoint returned %s", resp.Status)
}
}
func (p *Peer) signalEnded(reason string) {
log.Printf("Conference ended: %s", reason)
p.closed.Store(true)
p.stopTelemetry()
if p.onEnded != nil {
p.onEnded(reason)
}
}
func isConferenceEndMessage(msg map[string]interface{}) bool {
for _, key := range []string{"conferenceClosed", "conferenceEnded", "roomClosed", "roomEnded", "callEnded"} {
if _, ok := msg[key]; ok {
return true
}
}
if raw, ok := msg["conference"].(map[string]interface{}); ok {
if state, _ := raw["state"].(string); isEndedState(state) {
return true
}
}
if raw, ok := msg["conferenceState"].(map[string]interface{}); ok {
if state, _ := raw["state"].(string); isEndedState(state) {
return true
}
}
return false
}
func isEndedState(state string) bool {
switch strings.ToLower(state) {
case "closed", "ended", "finished", "terminated":
return true
default:
return false
}
}
func (p *Peer) setupICEHandlers() {
p.pcSub.OnICECandidate(func(c *webrtc.ICECandidate) {
if c == nil {
@@ -495,36 +752,51 @@ func (p *Peer) setupICEHandlers() {
})
}
func (p *Peer) sendLeave() {
func (p *Peer) sendLeave(uid string) bool {
p.wsMu.Lock()
defer p.wsMu.Unlock()
if p.ws == nil {
log.Println("WebSocket already closed, cannot send leave")
return
return false
}
leave := map[string]interface{}{
"uid": uuid.New().String(),
"uid": uid,
"leave": map[string]interface{}{},
}
if err := p.ws.WriteJSON(leave); err != nil {
log.Printf("Failed to send leave: %v", err)
return false
} else {
log.Println("Sent leave message to server")
}
return true
}
func (p *Peer) Close() error {
log.Println("Closing peer connection...")
alreadyClosing := p.closed.Swap(true)
p.sendQueueClosed.Store(true)
log.Println("Sending leave message...")
p.sendLeave()
if !alreadyClosing {
log.Println("Sending leave message...")
leaveUID := uuid.New().String()
leaveAck := p.registerAckWaiter(leaveUID)
if p.sendLeave(leaveUID) {
if p.waitForAck(leaveUID, leaveAck, 1500*time.Millisecond) {
log.Println("Leave acknowledged")
} else {
log.Println("Leave ack timeout")
}
} else {
p.removeAckWaiter(leaveUID)
}
time.Sleep(1 * time.Second)
p.stopTelemetry()
}
log.Println("Closing channels...")
if p.closeCh != nil {
@@ -627,7 +899,7 @@ func (p *Peer) keepAlive() {
func (p *Peer) reconnect(ctx context.Context) error {
log.Println("Reconnecting...")
p.sendLeave()
p.sendLeave(uuid.New().String())
time.Sleep(500 * time.Millisecond)
close(p.keepAliveCh)
@@ -727,10 +999,20 @@ func (p *Peer) processSendQueue(workerID int) {
for {
select {
case data := <-p.sendQueue:
case data, ok := <-p.sendQueue:
if !ok {
return
}
if p.dc == nil || p.dc.ReadyState() != webrtc.DataChannelStateOpen {
continue
}
if p.trafficShape.MaxMessageSize > 0 && len(data) > p.trafficShape.MaxMessageSize {
log.Printf("[WORKER-%d] Refusing oversized DataChannel message size=%d limit=%d", workerID, len(data), p.trafficShape.MaxMessageSize)
continue
}
if delay := p.nextSendDelay(); delay > 0 {
time.Sleep(delay)
}
// Wait until SCTP buffer drains. Dropping here would corrupt the
// carried TCP streams (the mux is a reliable transport); large
@@ -804,3 +1086,15 @@ func (p *Peer) CanSend() bool {
}
return queueLen < 1000 && buffered < 3*1024*1024
}
func (p *Peer) nextSendDelay() time.Duration {
minDelay := p.trafficShape.MinDelay
maxDelay := p.trafficShape.MaxDelay
if maxDelay <= 0 {
return 0
}
if maxDelay <= minDelay {
return maxDelay
}
return minDelay + time.Duration(rand.Int64N(int64(maxDelay-minDelay)))
}

View File

@@ -0,0 +1,26 @@
package telemost
import "testing"
func TestIsConferenceEndMessage(t *testing.T) {
tests := []map[string]interface{}{
{"conferenceEnded": map[string]interface{}{}},
{"conference": map[string]interface{}{"state": "closed"}},
{"conferenceState": map[string]interface{}{"state": "TERMINATED"}},
}
for _, tt := range tests {
if !isConferenceEndMessage(tt) {
t.Fatalf("expected end message for %#v", tt)
}
}
}
func TestIsConferenceEndMessageIgnoresActiveState(t *testing.T) {
msg := map[string]interface{}{
"conference": map[string]interface{}{"state": "active"},
}
if isConferenceEndMessage(msg) {
t.Fatal("active conference state must not be treated as ended")
}
}