fix: isolate videochannel peers in shared rooms

This commit is contained in:
zarazaex69
2026-05-16 04:47:43 +03:00
parent 00a79b3c99
commit 6633c1ef8a
11 changed files with 163 additions and 36 deletions

View File

@@ -161,6 +161,7 @@ type Config struct {
URL string
Token string
RoomID string
ChannelID string
KeyHex string
SOCKSHost string
SOCKSPort int
@@ -643,6 +644,7 @@ func runOnce(
Transport: cfg.Transport,
Carrier: cfg.Auth,
RoomURL: roomURL,
ChannelID: cfg.ChannelID,
KeyHex: cfg.KeyHex,
DNSServer: cfg.DNSServer,
SOCKSProxyAddr: cfg.SOCKSProxyAddr,
@@ -687,6 +689,7 @@ func runOnce(
Transport: cfg.Transport,
Carrier: cfg.Auth,
RoomURL: roomURL,
ChannelID: cfg.ChannelID,
KeyHex: cfg.KeyHex,
LocalAddr: fmt.Sprintf("%s:%d", cfg.SOCKSHost, cfg.SOCKSPort),
DNSServer: cfg.DNSServer,

View File

@@ -79,6 +79,7 @@ type Config struct {
Transport string
Carrier string
RoomURL string
ChannelID string
KeyHex string
LocalAddr string
DNSServer string
@@ -198,6 +199,7 @@ func (c *Client) bringUpLink(
Engine: cfg.Engine,
URL: cfg.URL,
Token: cfg.Token,
ChannelID: cfg.ChannelID,
DeviceID: c.deviceID,
Name: names.Generate(),
OnData: c.onData,

View File

@@ -83,7 +83,8 @@ type Auth struct {
// Room identifies the conference room.
type Room struct {
ID string `yaml:"id"`
ID string `yaml:"id"`
Channel string `yaml:"channel"`
}
// Crypto holds the shared secret used to authenticate and encrypt the tunnel.
@@ -249,6 +250,7 @@ func Apply(dst session.Config, f File) session.Config {
dst.URL = pickString(dst.URL, f.Engine.URL)
dst.Token = pickString(dst.Token, f.Engine.Token)
dst.RoomID = pickString(dst.RoomID, f.Room.ID)
dst.ChannelID = pickString(dst.ChannelID, f.Room.Channel)
dst.KeyHex = pickString(dst.KeyHex, f.Crypto.Key)
dst.SOCKSHost = pickString(dst.SOCKSHost, f.SOCKS.Host)
dst.SOCKSPort = pickInt(dst.SOCKSPort, f.SOCKS.Port)
@@ -294,6 +296,7 @@ func ApplyProfile(base session.Config, p Profile) session.Config {
dst.URL = overlayString(dst.URL, p.Engine.URL)
dst.Token = overlayString(dst.Token, p.Engine.Token)
dst.RoomID = overlayString(dst.RoomID, p.Room.ID)
dst.ChannelID = overlayString(dst.ChannelID, p.Room.Channel)
dst.KeyHex = overlayString(dst.KeyHex, p.Crypto.Key)
dst.SOCKSHost = overlayString(dst.SOCKSHost, p.SOCKS.Host)
dst.SOCKSPort = overlayInt(dst.SOCKSPort, p.SOCKS.Port)

View File

@@ -10,6 +10,7 @@ import (
"fmt"
"io"
"net"
"os"
"strconv"
"strings"
"sync"
@@ -760,6 +761,7 @@ func startRealTunnel(
session.RegisterDefaults()
socksAddr := freeLocalAddr(ctx, t)
channelID := fmt.Sprintf("e2e-%d-%d", os.Getpid(), time.Now().UnixNano())
runCtx, cancel := context.WithCancel(ctx)
t.Cleanup(cancel)
@@ -771,6 +773,7 @@ func startRealTunnel(
Transport: transportName,
Carrier: carrierName,
RoomURL: roomURL,
ChannelID: channelID,
KeyHex: testKeyHex,
DNSServer: localDNSServer,
VideoWidth: 1080,
@@ -810,6 +813,7 @@ func startRealTunnel(
Transport: transportName,
Carrier: carrierName,
RoomURL: roomURL,
ChannelID: channelID,
KeyHex: testKeyHex,
DeviceID: clientDeviceID,
LocalAddr: socksAddr,

View File

@@ -21,6 +21,7 @@ func New(ctx context.Context, cfg link.Config) (link.Link, error) {
Engine: cfg.Engine,
URL: cfg.URL,
Token: cfg.Token,
ChannelID: cfg.ChannelID,
DeviceID: cfg.DeviceID,
Name: cfg.Name,
OnData: cfg.OnData,

View File

@@ -42,6 +42,7 @@ type Config struct {
Engine string
URL string
Token string
ChannelID string
DeviceID string
Name string
OnData func([]byte)

View File

@@ -93,6 +93,7 @@ type Config struct {
Transport string
Carrier string
RoomURL string
ChannelID string
KeyHex string
DNSServer string
SOCKSProxyAddr string
@@ -274,6 +275,7 @@ func (s *Server) bringUpLink(
Engine: cfg.Engine,
URL: cfg.URL,
Token: cfg.Token,
ChannelID: cfg.ChannelID,
DeviceID: "",
Name: names.Generate(),
OnData: s.onData,

View File

@@ -49,6 +49,7 @@ type Config struct {
Engine string
URL string
Token string
ChannelID string
DeviceID string
Name string
OnData func([]byte)

View File

@@ -7,9 +7,21 @@ import (
const (
protocolMagic uint32 = 0x4f565632 // OVV2
protocolVersion byte = 1
protocolVersion byte = 2
frameTypeData byte = 1
frameTypeAck byte = 2
frameRoleAny byte = 0
frameRoleServer byte = 1
frameRoleClient byte = 2
frameBindingOff = 7
frameSeqOff = 11
frameCRCOff = 15
frameAckLen = 19
frameTotalLenOff = 19
frameFragIdxOff = 23
frameFragTotalOff = 25
frameDataHdrLen = 27
)
var (
@@ -29,6 +41,8 @@ var (
type transportFrame struct {
typ byte
role byte
binding uint32
seq uint32
crc uint32
totalLen uint32
@@ -65,26 +79,52 @@ func fragmentPayload(data []byte, maxSize int) [][]byte {
}
func encodeDataFrame(seq, crc uint32, totalLen, fragIdx, fragTotal int, payload []byte) []byte {
out := make([]byte, 22+len(payload))
return encodeDataFrameForBinding(frameRoleAny, 0, seq, crc, totalLen, fragIdx, fragTotal, payload)
}
func encodeDataFrameForRole(role byte, seq, crc uint32, totalLen, fragIdx, fragTotal int, payload []byte) []byte {
return encodeDataFrameForBinding(role, 0, seq, crc, totalLen, fragIdx, fragTotal, payload)
}
func encodeDataFrameForBinding(
role byte,
binding uint32,
seq, crc uint32,
totalLen, fragIdx, fragTotal int,
payload []byte,
) []byte {
out := make([]byte, frameDataHdrLen+len(payload))
binary.BigEndian.PutUint32(out[0:4], protocolMagic)
out[4] = protocolVersion
out[5] = frameTypeData
binary.BigEndian.PutUint32(out[6:10], seq)
binary.BigEndian.PutUint32(out[10:14], crc)
binary.BigEndian.PutUint32(out[14:18], uint32(totalLen)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
binary.BigEndian.PutUint16(out[18:20], uint16(fragIdx)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
binary.BigEndian.PutUint16(out[20:22], uint16(fragTotal)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
copy(out[22:], payload)
out[6] = role
binary.BigEndian.PutUint32(out[frameBindingOff:frameSeqOff], binding)
binary.BigEndian.PutUint32(out[frameSeqOff:frameCRCOff], seq)
binary.BigEndian.PutUint32(out[frameCRCOff:frameAckLen], crc)
binary.BigEndian.PutUint32(out[frameTotalLenOff:frameFragIdxOff], uint32(totalLen)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
binary.BigEndian.PutUint16(out[frameFragIdxOff:frameFragTotalOff], uint16(fragIdx)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
binary.BigEndian.PutUint16(out[frameFragTotalOff:frameDataHdrLen], uint16(fragTotal)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic
copy(out[frameDataHdrLen:], payload)
return out
}
func encodeAckFrame(seq, crc uint32) []byte {
out := make([]byte, 14)
return encodeAckFrameForBinding(frameRoleAny, 0, seq, crc)
}
func encodeAckFrameForRole(role byte, seq, crc uint32) []byte {
return encodeAckFrameForBinding(role, 0, seq, crc)
}
func encodeAckFrameForBinding(role byte, binding, seq, crc uint32) []byte {
out := make([]byte, frameAckLen)
binary.BigEndian.PutUint32(out[0:4], protocolMagic)
out[4] = protocolVersion
out[5] = frameTypeAck
binary.BigEndian.PutUint32(out[6:10], seq)
binary.BigEndian.PutUint32(out[10:14], crc)
out[6] = role
binary.BigEndian.PutUint32(out[frameBindingOff:frameSeqOff], binding)
binary.BigEndian.PutUint32(out[frameSeqOff:frameCRCOff], seq)
binary.BigEndian.PutUint32(out[frameCRCOff:frameAckLen], crc)
return out
}
@@ -100,24 +140,36 @@ func decodeTransportFrame(data []byte) (transportFrame, error) {
}
frame := transportFrame{typ: data[5]}
if len(data) < frameSeqOff {
switch frame.typ {
case frameTypeAck:
return transportFrame{}, ErrAckTooShort
case frameTypeData:
return transportFrame{}, ErrDataTooShort
default:
return transportFrame{}, ErrUnexpectedFrameType
}
}
frame.role = data[6]
frame.binding = binary.BigEndian.Uint32(data[frameBindingOff:frameSeqOff])
switch frame.typ {
case frameTypeAck:
if len(data) < 14 {
if len(data) < frameAckLen {
return transportFrame{}, ErrAckTooShort
}
frame.seq = binary.BigEndian.Uint32(data[6:10])
frame.crc = binary.BigEndian.Uint32(data[10:14])
frame.seq = binary.BigEndian.Uint32(data[frameSeqOff:frameCRCOff])
frame.crc = binary.BigEndian.Uint32(data[frameCRCOff:frameAckLen])
return frame, nil
case frameTypeData:
if len(data) < 22 {
if len(data) < frameDataHdrLen {
return transportFrame{}, ErrDataTooShort
}
frame.seq = binary.BigEndian.Uint32(data[6:10])
frame.crc = binary.BigEndian.Uint32(data[10:14])
frame.totalLen = binary.BigEndian.Uint32(data[14:18])
frame.fragIdx = binary.BigEndian.Uint16(data[18:20])
frame.fragTotal = binary.BigEndian.Uint16(data[20:22])
frame.payload = append([]byte(nil), data[22:]...)
frame.seq = binary.BigEndian.Uint32(data[frameSeqOff:frameCRCOff])
frame.crc = binary.BigEndian.Uint32(data[frameCRCOff:frameAckLen])
frame.totalLen = binary.BigEndian.Uint32(data[frameTotalLenOff:frameFragIdxOff])
frame.fragIdx = binary.BigEndian.Uint16(data[frameFragIdxOff:frameFragTotalOff])
frame.fragTotal = binary.BigEndian.Uint16(data[frameFragTotalOff:frameDataHdrLen])
frame.payload = append([]byte(nil), data[frameDataHdrLen:]...)
return frame, nil
default:
return transportFrame{}, ErrUnexpectedFrameType

View File

@@ -45,8 +45,8 @@ type streamTransport struct {
codec codecSpec
encoder *ffmpegEncoder
encoderMu sync.Mutex
decoder *ffmpegDecoder
decoderMu sync.Mutex
decoders map[*ffmpegDecoder]struct{}
onData func([]byte)
outbound chan []byte
outboundAck chan []byte
@@ -72,6 +72,9 @@ type streamTransport struct {
videoCodec string
videoTileModule int
videoTileRS int
localRole byte
remoteRole byte
bindingToken uint32
runCtx context.Context //nolint:containedctx,lll // long-lived context drives idle-frame loops bound to this transport's lifetime
idleFrame []byte
@@ -138,6 +141,7 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error)
outboundAck: make(chan []byte, 64),
closeCh: make(chan struct{}),
writerDone: make(chan struct{}),
decoders: make(map[*ffmpegDecoder]struct{}),
ackWaiters: make(map[uint32]chan uint32),
inbound: make(map[uint32]*inboundMessage),
delivered: make(map[uint32]uint32),
@@ -151,6 +155,9 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error)
videoCodec: cfg.VideoCodec,
videoTileModule: tileModule,
videoTileRS: tileRS,
localRole: localFrameRole(cfg.DeviceID),
remoteRole: remoteFrameRole(cfg.DeviceID),
bindingToken: bindingToken(cfg.ChannelID),
runCtx: ctx,
}
@@ -222,7 +229,7 @@ func (p *streamTransport) Send(data []byte) error {
for range maxSendAttempts {
for idx, fragment := range fragments {
frame := encodeDataFrame(seq, crc, len(data), idx, len(fragments), fragment)
frame := encodeDataFrameForBinding(p.localRole, p.bindingToken, seq, crc, len(data), idx, len(fragments), fragment)
if err := p.enqueueFrame(frame, false); err != nil {
return err
}
@@ -257,9 +264,10 @@ func (p *streamTransport) Close() error {
p.encoderMu.Unlock()
p.decoderMu.Lock()
if p.decoder != nil {
_ = p.decoder.Close()
for decoder := range p.decoders {
_ = decoder.Close()
}
p.decoders = nil
p.decoderMu.Unlock()
if p.writerUp.Load() {
@@ -445,8 +453,8 @@ func (p *streamTransport) enqueueFrame(frame []byte, priority bool) error {
func (p *streamTransport) popDecoderFrames(decoder *ffmpegDecoder) {
defer func() {
p.decoderMu.Lock()
if p.decoder == decoder {
p.decoder = nil
if p.decoders != nil {
delete(p.decoders, decoder)
}
p.decoderMu.Unlock()
_ = decoder.Close()
@@ -511,15 +519,12 @@ func (p *streamTransport) handleRemoteTrack(track *webrtc.TrackRemote, _ *webrtc
}
p.decoderMu.Lock()
if p.closed.Load() {
if p.closed.Load() || p.decoders == nil {
p.decoderMu.Unlock()
_ = decoder.Close()
return
}
if p.decoder != nil {
_ = p.decoder.Close()
}
p.decoder = decoder
p.decoders[decoder] = struct{}{}
p.decoderMu.Unlock()
go p.popDecoderFrames(decoder)
@@ -542,6 +547,9 @@ func (p *streamTransport) handleFrame(frame []byte) {
logger.Debugf("videochannel decode transport frame error: %v", err)
return
}
if !p.acceptFrame(decoded) {
return
}
switch decoded.typ {
case frameTypeAck:
@@ -620,7 +628,7 @@ func (p *streamTransport) handleInboundFrame(frame transportFrame) {
}
func (p *streamTransport) sendAck(seq, crc uint32) {
_ = p.enqueueFrame(encodeAckFrame(seq, crc), true)
_ = p.enqueueFrame(encodeAckFrameForBinding(p.localRole, p.bindingToken, seq, crc), true)
}
func (p *streamTransport) resolveAck(seq, crc uint32) {
@@ -648,3 +656,31 @@ func randomID() string {
}
return hex.EncodeToString(b[:])
}
func localFrameRole(deviceID string) byte {
if deviceID == "" {
return frameRoleServer
}
return frameRoleClient
}
func remoteFrameRole(deviceID string) byte {
if deviceID == "" {
return frameRoleClient
}
return frameRoleServer
}
func bindingToken(channelID string) uint32 {
token := crc32.ChecksumIEEE([]byte(channelID))
if token == 0 && channelID != "" {
token = 1
}
return token
}
func (p *streamTransport) acceptFrame(frame transportFrame) bool {
roleOK := frame.role == frameRoleAny || frame.role == p.remoteRole
bindingOK := frame.binding == 0 || frame.binding == p.bindingToken
return roleOK && bindingOK
}

View File

@@ -62,12 +62,13 @@ func TestTileIdleFrameIgnored(t *testing.T) {
}
func TestTransportFrameRoundTrip(t *testing.T) {
encoded := encodeDataFrame(42, 0xdeadbeef, 1024, 1, 3, []byte("chunk"))
encoded := encodeDataFrameForBinding(frameRoleClient, 0x12345678, 42, 0xdeadbeef, 1024, 1, 3, []byte("chunk"))
decoded, err := decodeTransportFrame(encoded)
if err != nil {
t.Fatalf("decodeTransportFrame failed: %v", err)
}
if decoded.typ != frameTypeData || decoded.seq != 42 || decoded.crc != 0xdeadbeef {
if decoded.typ != frameTypeData || decoded.role != frameRoleClient ||
decoded.binding != 0x12345678 || decoded.seq != 42 || decoded.crc != 0xdeadbeef {
t.Fatalf("unexpected frame header: %+v", decoded)
}
if decoded.totalLen != 1024 || decoded.fragIdx != 1 || decoded.fragTotal != 3 {
@@ -77,3 +78,24 @@ func TestTransportFrameRoundTrip(t *testing.T) {
t.Fatalf("payload mismatch: got=%q", decoded.payload)
}
}
func TestAcceptFrameRole(t *testing.T) {
server := &streamTransport{remoteRole: frameRoleClient, bindingToken: 10}
if !server.acceptFrame(transportFrame{role: frameRoleClient, binding: 10}) {
t.Fatal("server rejected client frame")
}
if server.acceptFrame(transportFrame{role: frameRoleServer, binding: 10}) {
t.Fatal("server accepted server frame")
}
if server.acceptFrame(transportFrame{role: frameRoleClient, binding: 11}) {
t.Fatal("server accepted different binding")
}
client := &streamTransport{remoteRole: frameRoleServer, bindingToken: 20}
if !client.acceptFrame(transportFrame{role: frameRoleServer, binding: 20}) {
t.Fatal("client rejected server frame")
}
if client.acceptFrame(transportFrame{role: frameRoleClient, binding: 20}) {
t.Fatal("client accepted client frame")
}
}