fix(vp8channel): support multiple simultaneous clients in same room

Implement PeerTransport interface (SendTo/SupportsPeerRouting) so the
server can route KCP traffic to individual peers by their epoch.

When OnPeerData is set (server mode), each remote epoch gets its own
KCP runtime instead of triggering a reconnect loop.

Also add DNS retry in protect.NewHTTPClient to handle transient
resolver failures.

Fixes #67
This commit is contained in:
zarazaex69
2026-05-24 17:27:00 +03:00
parent cb6fe0980d
commit cefd260e5d

View File

@@ -13,6 +13,7 @@ import (
"fmt"
"hash/crc32"
"hash/fnv"
"strconv"
"sync"
"sync/atomic"
"time"
@@ -89,6 +90,7 @@ type streamTransport struct {
stream videoSession
track *webrtc.TrackLocalStaticSample
onData func([]byte)
onPeerData func(peerID string, data []byte)
outbound chan []byte
closeCh chan struct{}
writerDone chan struct{}
@@ -112,6 +114,12 @@ type streamTransport struct {
kcpMu sync.RWMutex
reconnectMu sync.Mutex
reconnectFn func()
// Multi-peer support: when onPeerData is set, each remote epoch gets
// its own KCP runtime and data is routed via onPeerData(peerID, ...).
peersMu sync.RWMutex
peers map[uint32]*kcpRuntime // epoch → KCP runtime
peerOut map[uint32]chan []byte // epoch → outbound queue
}
// New creates a vp8channel transport backed by a carrier engine.
@@ -170,6 +178,7 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error)
stream: stream,
track: track,
onData: cfg.OnData,
onPeerData: cfg.OnPeerData,
outbound: make(chan []byte, outboundQueueSize),
closeCh: make(chan struct{}),
writerDone: make(chan struct{}),
@@ -177,6 +186,8 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error)
batchSize: batchSize,
bindingToken: bindingToken(cfg.RoomURL),
localEpoch: randomEpoch(),
peers: make(map[uint32]*kcpRuntime),
peerOut: make(map[uint32]chan []byte),
}
if err := stream.AddTrack(track); err != nil {
@@ -313,6 +324,29 @@ func (p *streamTransport) Send(data []byte) error {
return rt.send(data)
}
// SendTo transmits data to a specific peer identified by its epoch hex string.
func (p *streamTransport) SendTo(peerID string, data []byte) error {
if p.closed.Load() {
return ErrTransportClosed
}
epoch, err := parsePeerID(peerID)
if err != nil {
return fmt.Errorf("vp8channel: invalid peerID %q: %w", peerID, err)
}
p.peersMu.RLock()
rt := p.peers[epoch]
p.peersMu.RUnlock()
if rt == nil {
return ErrTransportClosed
}
return rt.send(data)
}
// SupportsPeerRouting reports whether this transport can address individual peers.
func (p *streamTransport) SupportsPeerRouting() bool {
return p.onPeerData != nil
}
func (p *streamTransport) Close() error {
if p.closed.CompareAndSwap(false, true) {
close(p.closeCh)
@@ -324,6 +358,14 @@ func (p *streamTransport) Close() error {
rt.close()
}
p.peersMu.Lock()
for _, prt := range p.peers {
prt.close()
}
p.peers = make(map[uint32]*kcpRuntime)
p.peerOut = make(map[uint32]chan []byte)
p.peersMu.Unlock()
if p.writerUp.Load() {
<-p.writerDone
}
@@ -629,6 +671,12 @@ func (p *streamTransport) handleIncomingFrame(frame []byte) {
return
}
// Multi-peer mode: route each epoch to its own KCP runtime.
if p.onPeerData != nil {
p.handlePeerFrame(peerEpoch, kcpPayload)
return
}
if !p.hadPeer.Swap(true) {
p.handleFirstPeer(peerEpoch)
} else if prev := p.peerEpoch.Load(); prev != peerEpoch {
@@ -659,6 +707,91 @@ func (p *streamTransport) handleIncomingFrame(frame []byte) {
}
}
// handlePeerFrame routes incoming KCP data to a per-peer KCP runtime,
// creating one on demand. Each peer epoch gets its own independent KCP
// session so multiple clients can coexist in the same room.
func (p *streamTransport) handlePeerFrame(peerEpoch uint32, kcpPayload []byte) {
if len(kcpPayload) == 0 {
// Keepalive — ensure peer is registered but nothing to deliver.
p.getOrCreatePeerKCP(peerEpoch)
return
}
rt := p.getOrCreatePeerKCP(peerEpoch)
if rt != nil {
deliverKCPPayload(rt, kcpPayload)
}
}
func (p *streamTransport) getOrCreatePeerKCP(epoch uint32) *kcpRuntime {
p.peersMu.RLock()
rt := p.peers[epoch]
p.peersMu.RUnlock()
if rt != nil {
return rt
}
p.peersMu.Lock()
defer p.peersMu.Unlock()
// Double-check after acquiring write lock.
if rt = p.peers[epoch]; rt != nil {
return rt
}
peerID := formatPeerID(epoch)
out := make(chan []byte, outboundQueueSize)
hdr := buildEpochHeader(p.bindingToken, p.localEpochValue())
rt, err := startKCP(out, func(data []byte) {
if p.onPeerData != nil {
p.onPeerData(peerID, data)
}
}, hdr)
if err != nil {
logger.Warnf("vp8channel: startKCP for peer 0x%08x failed: %v", epoch, err)
return nil
}
p.peers[epoch] = rt
p.peerOut[epoch] = out
logger.Infof("vp8channel: peer session created epoch=0x%08x", epoch)
// Pump outbound frames from this peer's queue into the writer.
go p.peerWriterPump(epoch, out)
return rt
}
// peerWriterPump drains a peer's outbound KCP queue and writes frames to the
// shared video track. Stops when the channel is closed or transport shuts down.
func (p *streamTransport) peerWriterPump(epoch uint32, out chan []byte) {
for {
select {
case <-p.closeCh:
return
case frame, ok := <-out:
if !ok {
return
}
_ = p.track.WriteSample(media.Sample{
Data: frame,
Duration: p.frameInterval,
})
}
}
}
func formatPeerID(epoch uint32) string {
return fmt.Sprintf("%08x", epoch)
}
func parsePeerID(peerID string) (uint32, error) {
v, err := strconv.ParseUint(peerID, 16, 32)
if err != nil {
return 0, err
}
return uint32(v), nil //nolint:gosec // G115: bounded by ParseUint bitSize=32
}
func deliverKCPPayload(rt *kcpRuntime, payload []byte) {
if rt == nil || len(payload) == 0 {
return