mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-05-29 16:39:45 +00:00
feat(peer): Add send queue for async datachannel writes
This commit is contained in:
@@ -6,10 +6,12 @@ import (
|
||||
"log"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/gorilla/websocket"
|
||||
"github.com/openlibrecommunity/olcrtc/internal/logger"
|
||||
"github.com/pion/webrtc/v4"
|
||||
)
|
||||
|
||||
@@ -30,6 +32,8 @@ type Peer struct {
|
||||
lastReconnect time.Time
|
||||
reconnectCount int
|
||||
reconnectMu sync.Mutex
|
||||
sendQueue chan []byte
|
||||
sendQueueClosed atomic.Bool
|
||||
}
|
||||
|
||||
func NewPeer(roomURL, name string, onData func([]byte)) (*Peer, error) {
|
||||
@@ -46,6 +50,7 @@ func NewPeer(roomURL, name string, onData func([]byte)) (*Peer, error) {
|
||||
reconnectCh: make(chan struct{}, 1),
|
||||
closeCh: make(chan struct{}),
|
||||
keepAliveCh: make(chan struct{}),
|
||||
sendQueue: make(chan []byte, 1000),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@@ -99,6 +104,7 @@ func (p *Peer) Connect(ctx context.Context) error {
|
||||
dcReady := make(chan struct{})
|
||||
p.dc.OnOpen(func() {
|
||||
log.Println("DataChannel opened")
|
||||
go p.processSendQueue()
|
||||
close(dcReady)
|
||||
})
|
||||
|
||||
@@ -174,11 +180,16 @@ func (p *Peer) Send(data []byte) error {
|
||||
return fmt.Errorf("datachannel not ready")
|
||||
}
|
||||
|
||||
for p.dc.BufferedAmount() > 256*1024 {
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
if p.sendQueueClosed.Load() {
|
||||
return fmt.Errorf("send queue closed")
|
||||
}
|
||||
|
||||
return p.dc.Send(data)
|
||||
select {
|
||||
case p.sendQueue <- data:
|
||||
return nil
|
||||
default:
|
||||
return fmt.Errorf("send queue full")
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Peer) sendHello() error {
|
||||
@@ -621,3 +632,22 @@ func (p *Peer) WatchConnection(ctx context.Context) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (p *Peer) processSendQueue() {
|
||||
for {
|
||||
select {
|
||||
case data := <-p.sendQueue:
|
||||
for p.dc != nil && p.dc.BufferedAmount() > 256*1024 {
|
||||
time.Sleep(1 * time.Millisecond)
|
||||
}
|
||||
|
||||
if p.dc != nil && p.dc.ReadyState() == webrtc.DataChannelStateOpen {
|
||||
if err := p.dc.Send(data); err != nil {
|
||||
logger.Debug("DataChannel send error: %v", err)
|
||||
}
|
||||
}
|
||||
case <-p.closeCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user