diff --git a/internal/transport/vp8channel/transport.go b/internal/transport/vp8channel/transport.go index 8176b00..ee1718d 100644 --- a/internal/transport/vp8channel/transport.go +++ b/internal/transport/vp8channel/transport.go @@ -13,6 +13,7 @@ import ( "fmt" "hash/crc32" "hash/fnv" + "strconv" "sync" "sync/atomic" "time" @@ -89,6 +90,7 @@ type streamTransport struct { stream videoSession track *webrtc.TrackLocalStaticSample onData func([]byte) + onPeerData func(peerID string, data []byte) outbound chan []byte closeCh chan struct{} writerDone chan struct{} @@ -112,6 +114,12 @@ type streamTransport struct { kcpMu sync.RWMutex reconnectMu sync.Mutex reconnectFn func() + + // Multi-peer support: when onPeerData is set, each remote epoch gets + // its own KCP runtime and data is routed via onPeerData(peerID, ...). + peersMu sync.RWMutex + peers map[uint32]*kcpRuntime // epoch → KCP runtime + peerOut map[uint32]chan []byte // epoch → outbound queue } // New creates a vp8channel transport backed by a carrier engine. @@ -170,6 +178,7 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) stream: stream, track: track, onData: cfg.OnData, + onPeerData: cfg.OnPeerData, outbound: make(chan []byte, outboundQueueSize), closeCh: make(chan struct{}), writerDone: make(chan struct{}), @@ -177,6 +186,8 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) batchSize: batchSize, bindingToken: bindingToken(cfg.RoomURL), localEpoch: randomEpoch(), + peers: make(map[uint32]*kcpRuntime), + peerOut: make(map[uint32]chan []byte), } if err := stream.AddTrack(track); err != nil { @@ -313,6 +324,29 @@ func (p *streamTransport) Send(data []byte) error { return rt.send(data) } +// SendTo transmits data to a specific peer identified by its epoch hex string. +func (p *streamTransport) SendTo(peerID string, data []byte) error { + if p.closed.Load() { + return ErrTransportClosed + } + epoch, err := parsePeerID(peerID) + if err != nil { + return fmt.Errorf("vp8channel: invalid peerID %q: %w", peerID, err) + } + p.peersMu.RLock() + rt := p.peers[epoch] + p.peersMu.RUnlock() + if rt == nil { + return ErrTransportClosed + } + return rt.send(data) +} + +// SupportsPeerRouting reports whether this transport can address individual peers. +func (p *streamTransport) SupportsPeerRouting() bool { + return p.onPeerData != nil +} + func (p *streamTransport) Close() error { if p.closed.CompareAndSwap(false, true) { close(p.closeCh) @@ -324,6 +358,14 @@ func (p *streamTransport) Close() error { rt.close() } + p.peersMu.Lock() + for _, prt := range p.peers { + prt.close() + } + p.peers = make(map[uint32]*kcpRuntime) + p.peerOut = make(map[uint32]chan []byte) + p.peersMu.Unlock() + if p.writerUp.Load() { <-p.writerDone } @@ -629,6 +671,12 @@ func (p *streamTransport) handleIncomingFrame(frame []byte) { return } + // Multi-peer mode: route each epoch to its own KCP runtime. + if p.onPeerData != nil { + p.handlePeerFrame(peerEpoch, kcpPayload) + return + } + if !p.hadPeer.Swap(true) { p.handleFirstPeer(peerEpoch) } else if prev := p.peerEpoch.Load(); prev != peerEpoch { @@ -659,6 +707,91 @@ func (p *streamTransport) handleIncomingFrame(frame []byte) { } } +// handlePeerFrame routes incoming KCP data to a per-peer KCP runtime, +// creating one on demand. Each peer epoch gets its own independent KCP +// session so multiple clients can coexist in the same room. +func (p *streamTransport) handlePeerFrame(peerEpoch uint32, kcpPayload []byte) { + if len(kcpPayload) == 0 { + // Keepalive — ensure peer is registered but nothing to deliver. + p.getOrCreatePeerKCP(peerEpoch) + return + } + + rt := p.getOrCreatePeerKCP(peerEpoch) + if rt != nil { + deliverKCPPayload(rt, kcpPayload) + } +} + +func (p *streamTransport) getOrCreatePeerKCP(epoch uint32) *kcpRuntime { + p.peersMu.RLock() + rt := p.peers[epoch] + p.peersMu.RUnlock() + if rt != nil { + return rt + } + + p.peersMu.Lock() + defer p.peersMu.Unlock() + + // Double-check after acquiring write lock. + if rt = p.peers[epoch]; rt != nil { + return rt + } + + peerID := formatPeerID(epoch) + out := make(chan []byte, outboundQueueSize) + hdr := buildEpochHeader(p.bindingToken, p.localEpochValue()) + rt, err := startKCP(out, func(data []byte) { + if p.onPeerData != nil { + p.onPeerData(peerID, data) + } + }, hdr) + if err != nil { + logger.Warnf("vp8channel: startKCP for peer 0x%08x failed: %v", epoch, err) + return nil + } + p.peers[epoch] = rt + p.peerOut[epoch] = out + logger.Infof("vp8channel: peer session created epoch=0x%08x", epoch) + + // Pump outbound frames from this peer's queue into the writer. + go p.peerWriterPump(epoch, out) + + return rt +} + +// peerWriterPump drains a peer's outbound KCP queue and writes frames to the +// shared video track. Stops when the channel is closed or transport shuts down. +func (p *streamTransport) peerWriterPump(epoch uint32, out chan []byte) { + for { + select { + case <-p.closeCh: + return + case frame, ok := <-out: + if !ok { + return + } + _ = p.track.WriteSample(media.Sample{ + Data: frame, + Duration: p.frameInterval, + }) + } + } +} + +func formatPeerID(epoch uint32) string { + return fmt.Sprintf("%08x", epoch) +} + +func parsePeerID(peerID string) (uint32, error) { + v, err := strconv.ParseUint(peerID, 16, 32) + if err != nil { + return 0, err + } + return uint32(v), nil //nolint:gosec // G115: bounded by ParseUint bitSize=32 +} + func deliverKCPPayload(rt *kcpRuntime, payload []byte) { if rt == nil || len(payload) == 0 { return