From 03217843469e10ff0dac0a2d9f42e390750aad85 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Thu, 9 Apr 2026 18:57:12 +0300 Subject: [PATCH] feat(peer): Add send queue for async datachannel writes --- internal/telemost/peer.go | 36 +++++++++++++++++++++++++++++++++--- 1 file changed, 33 insertions(+), 3 deletions(-) diff --git a/internal/telemost/peer.go b/internal/telemost/peer.go index 9109711..0a233d9 100644 --- a/internal/telemost/peer.go +++ b/internal/telemost/peer.go @@ -6,10 +6,12 @@ import ( "log" "strings" "sync" + "sync/atomic" "time" "github.com/google/uuid" "github.com/gorilla/websocket" + "github.com/openlibrecommunity/olcrtc/internal/logger" "github.com/pion/webrtc/v4" ) @@ -30,6 +32,8 @@ type Peer struct { lastReconnect time.Time reconnectCount int reconnectMu sync.Mutex + sendQueue chan []byte + sendQueueClosed atomic.Bool } func NewPeer(roomURL, name string, onData func([]byte)) (*Peer, error) { @@ -46,6 +50,7 @@ func NewPeer(roomURL, name string, onData func([]byte)) (*Peer, error) { reconnectCh: make(chan struct{}, 1), closeCh: make(chan struct{}), keepAliveCh: make(chan struct{}), + sendQueue: make(chan []byte, 1000), }, nil } @@ -99,6 +104,7 @@ func (p *Peer) Connect(ctx context.Context) error { dcReady := make(chan struct{}) p.dc.OnOpen(func() { log.Println("DataChannel opened") + go p.processSendQueue() close(dcReady) }) @@ -174,11 +180,16 @@ func (p *Peer) Send(data []byte) error { return fmt.Errorf("datachannel not ready") } - for p.dc.BufferedAmount() > 256*1024 { - time.Sleep(1 * time.Millisecond) + if p.sendQueueClosed.Load() { + return fmt.Errorf("send queue closed") } - return p.dc.Send(data) + select { + case p.sendQueue <- data: + return nil + default: + return fmt.Errorf("send queue full") + } } func (p *Peer) sendHello() error { @@ -621,3 +632,22 @@ func (p *Peer) WatchConnection(ctx context.Context) { } } } + +func (p *Peer) processSendQueue() { + for { + select { + case data := <-p.sendQueue: + for p.dc != nil && p.dc.BufferedAmount() > 256*1024 { + time.Sleep(1 * time.Millisecond) + } + + if p.dc != nil && p.dc.ReadyState() == webrtc.DataChannelStateOpen { + if err := p.dc.Send(data); err != nil { + logger.Debug("DataChannel send error: %v", err) + } + } + case <-p.closeCh: + return + } + } +}