From 6633c1ef8a3cad31e20d7ef3aca742fa79af9753 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Sat, 16 May 2026 04:47:43 +0300 Subject: [PATCH] fix: isolate videochannel peers in shared rooms --- internal/app/session/session.go | 3 + internal/client/client.go | 2 + internal/config/config.go | 5 +- internal/e2e/tunnel_test.go | 4 + internal/link/direct/direct.go | 1 + internal/link/link.go | 1 + internal/server/server.go | 2 + internal/transport/transport.go | 1 + internal/transport/videochannel/frame.go | 94 ++++++++++++++----- internal/transport/videochannel/transport.go | 60 +++++++++--- .../transport/videochannel/transport_test.go | 26 ++++- 11 files changed, 163 insertions(+), 36 deletions(-) diff --git a/internal/app/session/session.go b/internal/app/session/session.go index 4925e05..0dd44bd 100644 --- a/internal/app/session/session.go +++ b/internal/app/session/session.go @@ -161,6 +161,7 @@ type Config struct { URL string Token string RoomID string + ChannelID string KeyHex string SOCKSHost string SOCKSPort int @@ -643,6 +644,7 @@ func runOnce( Transport: cfg.Transport, Carrier: cfg.Auth, RoomURL: roomURL, + ChannelID: cfg.ChannelID, KeyHex: cfg.KeyHex, DNSServer: cfg.DNSServer, SOCKSProxyAddr: cfg.SOCKSProxyAddr, @@ -687,6 +689,7 @@ func runOnce( Transport: cfg.Transport, Carrier: cfg.Auth, RoomURL: roomURL, + ChannelID: cfg.ChannelID, KeyHex: cfg.KeyHex, LocalAddr: fmt.Sprintf("%s:%d", cfg.SOCKSHost, cfg.SOCKSPort), DNSServer: cfg.DNSServer, diff --git a/internal/client/client.go b/internal/client/client.go index 541b2d5..07e90d6 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -79,6 +79,7 @@ type Config struct { Transport string Carrier string RoomURL string + ChannelID string KeyHex string LocalAddr string DNSServer string @@ -198,6 +199,7 @@ func (c *Client) bringUpLink( Engine: cfg.Engine, URL: cfg.URL, Token: cfg.Token, + ChannelID: cfg.ChannelID, DeviceID: c.deviceID, Name: names.Generate(), OnData: c.onData, diff --git a/internal/config/config.go b/internal/config/config.go index 3cd5a0a..d831669 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -83,7 +83,8 @@ type Auth struct { // Room identifies the conference room. type Room struct { - ID string `yaml:"id"` + ID string `yaml:"id"` + Channel string `yaml:"channel"` } // Crypto holds the shared secret used to authenticate and encrypt the tunnel. @@ -249,6 +250,7 @@ func Apply(dst session.Config, f File) session.Config { dst.URL = pickString(dst.URL, f.Engine.URL) dst.Token = pickString(dst.Token, f.Engine.Token) dst.RoomID = pickString(dst.RoomID, f.Room.ID) + dst.ChannelID = pickString(dst.ChannelID, f.Room.Channel) dst.KeyHex = pickString(dst.KeyHex, f.Crypto.Key) dst.SOCKSHost = pickString(dst.SOCKSHost, f.SOCKS.Host) dst.SOCKSPort = pickInt(dst.SOCKSPort, f.SOCKS.Port) @@ -294,6 +296,7 @@ func ApplyProfile(base session.Config, p Profile) session.Config { dst.URL = overlayString(dst.URL, p.Engine.URL) dst.Token = overlayString(dst.Token, p.Engine.Token) dst.RoomID = overlayString(dst.RoomID, p.Room.ID) + dst.ChannelID = overlayString(dst.ChannelID, p.Room.Channel) dst.KeyHex = overlayString(dst.KeyHex, p.Crypto.Key) dst.SOCKSHost = overlayString(dst.SOCKSHost, p.SOCKS.Host) dst.SOCKSPort = overlayInt(dst.SOCKSPort, p.SOCKS.Port) diff --git a/internal/e2e/tunnel_test.go b/internal/e2e/tunnel_test.go index 46fcf57..f185e50 100644 --- a/internal/e2e/tunnel_test.go +++ b/internal/e2e/tunnel_test.go @@ -10,6 +10,7 @@ import ( "fmt" "io" "net" + "os" "strconv" "strings" "sync" @@ -760,6 +761,7 @@ func startRealTunnel( session.RegisterDefaults() socksAddr := freeLocalAddr(ctx, t) + channelID := fmt.Sprintf("e2e-%d-%d", os.Getpid(), time.Now().UnixNano()) runCtx, cancel := context.WithCancel(ctx) t.Cleanup(cancel) @@ -771,6 +773,7 @@ func startRealTunnel( Transport: transportName, Carrier: carrierName, RoomURL: roomURL, + ChannelID: channelID, KeyHex: testKeyHex, DNSServer: localDNSServer, VideoWidth: 1080, @@ -810,6 +813,7 @@ func startRealTunnel( Transport: transportName, Carrier: carrierName, RoomURL: roomURL, + ChannelID: channelID, KeyHex: testKeyHex, DeviceID: clientDeviceID, LocalAddr: socksAddr, diff --git a/internal/link/direct/direct.go b/internal/link/direct/direct.go index 65089ab..eec50c0 100644 --- a/internal/link/direct/direct.go +++ b/internal/link/direct/direct.go @@ -21,6 +21,7 @@ func New(ctx context.Context, cfg link.Config) (link.Link, error) { Engine: cfg.Engine, URL: cfg.URL, Token: cfg.Token, + ChannelID: cfg.ChannelID, DeviceID: cfg.DeviceID, Name: cfg.Name, OnData: cfg.OnData, diff --git a/internal/link/link.go b/internal/link/link.go index c8957ac..6031e65 100644 --- a/internal/link/link.go +++ b/internal/link/link.go @@ -42,6 +42,7 @@ type Config struct { Engine string URL string Token string + ChannelID string DeviceID string Name string OnData func([]byte) diff --git a/internal/server/server.go b/internal/server/server.go index 743c40b..e4cfd81 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -93,6 +93,7 @@ type Config struct { Transport string Carrier string RoomURL string + ChannelID string KeyHex string DNSServer string SOCKSProxyAddr string @@ -274,6 +275,7 @@ func (s *Server) bringUpLink( Engine: cfg.Engine, URL: cfg.URL, Token: cfg.Token, + ChannelID: cfg.ChannelID, DeviceID: "", Name: names.Generate(), OnData: s.onData, diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 2f37a41..f0cab01 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -49,6 +49,7 @@ type Config struct { Engine string URL string Token string + ChannelID string DeviceID string Name string OnData func([]byte) diff --git a/internal/transport/videochannel/frame.go b/internal/transport/videochannel/frame.go index 30233a8..46289ac 100644 --- a/internal/transport/videochannel/frame.go +++ b/internal/transport/videochannel/frame.go @@ -7,9 +7,21 @@ import ( const ( protocolMagic uint32 = 0x4f565632 // OVV2 - protocolVersion byte = 1 + protocolVersion byte = 2 frameTypeData byte = 1 frameTypeAck byte = 2 + frameRoleAny byte = 0 + frameRoleServer byte = 1 + frameRoleClient byte = 2 + + frameBindingOff = 7 + frameSeqOff = 11 + frameCRCOff = 15 + frameAckLen = 19 + frameTotalLenOff = 19 + frameFragIdxOff = 23 + frameFragTotalOff = 25 + frameDataHdrLen = 27 ) var ( @@ -29,6 +41,8 @@ var ( type transportFrame struct { typ byte + role byte + binding uint32 seq uint32 crc uint32 totalLen uint32 @@ -65,26 +79,52 @@ func fragmentPayload(data []byte, maxSize int) [][]byte { } func encodeDataFrame(seq, crc uint32, totalLen, fragIdx, fragTotal int, payload []byte) []byte { - out := make([]byte, 22+len(payload)) + return encodeDataFrameForBinding(frameRoleAny, 0, seq, crc, totalLen, fragIdx, fragTotal, payload) +} + +func encodeDataFrameForRole(role byte, seq, crc uint32, totalLen, fragIdx, fragTotal int, payload []byte) []byte { + return encodeDataFrameForBinding(role, 0, seq, crc, totalLen, fragIdx, fragTotal, payload) +} + +func encodeDataFrameForBinding( + role byte, + binding uint32, + seq, crc uint32, + totalLen, fragIdx, fragTotal int, + payload []byte, +) []byte { + out := make([]byte, frameDataHdrLen+len(payload)) binary.BigEndian.PutUint32(out[0:4], protocolMagic) out[4] = protocolVersion out[5] = frameTypeData - binary.BigEndian.PutUint32(out[6:10], seq) - binary.BigEndian.PutUint32(out[10:14], crc) - binary.BigEndian.PutUint32(out[14:18], uint32(totalLen)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic - binary.BigEndian.PutUint16(out[18:20], uint16(fragIdx)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic - binary.BigEndian.PutUint16(out[20:22], uint16(fragTotal)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic - copy(out[22:], payload) + out[6] = role + binary.BigEndian.PutUint32(out[frameBindingOff:frameSeqOff], binding) + binary.BigEndian.PutUint32(out[frameSeqOff:frameCRCOff], seq) + binary.BigEndian.PutUint32(out[frameCRCOff:frameAckLen], crc) + binary.BigEndian.PutUint32(out[frameTotalLenOff:frameFragIdxOff], uint32(totalLen)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic + binary.BigEndian.PutUint16(out[frameFragIdxOff:frameFragTotalOff], uint16(fragIdx)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic + binary.BigEndian.PutUint16(out[frameFragTotalOff:frameDataHdrLen], uint16(fragTotal)) //nolint:gosec,lll // G115: bounded conversion verified by surrounding logic + copy(out[frameDataHdrLen:], payload) return out } func encodeAckFrame(seq, crc uint32) []byte { - out := make([]byte, 14) + return encodeAckFrameForBinding(frameRoleAny, 0, seq, crc) +} + +func encodeAckFrameForRole(role byte, seq, crc uint32) []byte { + return encodeAckFrameForBinding(role, 0, seq, crc) +} + +func encodeAckFrameForBinding(role byte, binding, seq, crc uint32) []byte { + out := make([]byte, frameAckLen) binary.BigEndian.PutUint32(out[0:4], protocolMagic) out[4] = protocolVersion out[5] = frameTypeAck - binary.BigEndian.PutUint32(out[6:10], seq) - binary.BigEndian.PutUint32(out[10:14], crc) + out[6] = role + binary.BigEndian.PutUint32(out[frameBindingOff:frameSeqOff], binding) + binary.BigEndian.PutUint32(out[frameSeqOff:frameCRCOff], seq) + binary.BigEndian.PutUint32(out[frameCRCOff:frameAckLen], crc) return out } @@ -100,24 +140,36 @@ func decodeTransportFrame(data []byte) (transportFrame, error) { } frame := transportFrame{typ: data[5]} + if len(data) < frameSeqOff { + switch frame.typ { + case frameTypeAck: + return transportFrame{}, ErrAckTooShort + case frameTypeData: + return transportFrame{}, ErrDataTooShort + default: + return transportFrame{}, ErrUnexpectedFrameType + } + } + frame.role = data[6] + frame.binding = binary.BigEndian.Uint32(data[frameBindingOff:frameSeqOff]) switch frame.typ { case frameTypeAck: - if len(data) < 14 { + if len(data) < frameAckLen { return transportFrame{}, ErrAckTooShort } - frame.seq = binary.BigEndian.Uint32(data[6:10]) - frame.crc = binary.BigEndian.Uint32(data[10:14]) + frame.seq = binary.BigEndian.Uint32(data[frameSeqOff:frameCRCOff]) + frame.crc = binary.BigEndian.Uint32(data[frameCRCOff:frameAckLen]) return frame, nil case frameTypeData: - if len(data) < 22 { + if len(data) < frameDataHdrLen { return transportFrame{}, ErrDataTooShort } - frame.seq = binary.BigEndian.Uint32(data[6:10]) - frame.crc = binary.BigEndian.Uint32(data[10:14]) - frame.totalLen = binary.BigEndian.Uint32(data[14:18]) - frame.fragIdx = binary.BigEndian.Uint16(data[18:20]) - frame.fragTotal = binary.BigEndian.Uint16(data[20:22]) - frame.payload = append([]byte(nil), data[22:]...) + frame.seq = binary.BigEndian.Uint32(data[frameSeqOff:frameCRCOff]) + frame.crc = binary.BigEndian.Uint32(data[frameCRCOff:frameAckLen]) + frame.totalLen = binary.BigEndian.Uint32(data[frameTotalLenOff:frameFragIdxOff]) + frame.fragIdx = binary.BigEndian.Uint16(data[frameFragIdxOff:frameFragTotalOff]) + frame.fragTotal = binary.BigEndian.Uint16(data[frameFragTotalOff:frameDataHdrLen]) + frame.payload = append([]byte(nil), data[frameDataHdrLen:]...) return frame, nil default: return transportFrame{}, ErrUnexpectedFrameType diff --git a/internal/transport/videochannel/transport.go b/internal/transport/videochannel/transport.go index 8468100..56a73bc 100644 --- a/internal/transport/videochannel/transport.go +++ b/internal/transport/videochannel/transport.go @@ -45,8 +45,8 @@ type streamTransport struct { codec codecSpec encoder *ffmpegEncoder encoderMu sync.Mutex - decoder *ffmpegDecoder decoderMu sync.Mutex + decoders map[*ffmpegDecoder]struct{} onData func([]byte) outbound chan []byte outboundAck chan []byte @@ -72,6 +72,9 @@ type streamTransport struct { videoCodec string videoTileModule int videoTileRS int + localRole byte + remoteRole byte + bindingToken uint32 runCtx context.Context //nolint:containedctx,lll // long-lived context drives idle-frame loops bound to this transport's lifetime idleFrame []byte @@ -138,6 +141,7 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) outboundAck: make(chan []byte, 64), closeCh: make(chan struct{}), writerDone: make(chan struct{}), + decoders: make(map[*ffmpegDecoder]struct{}), ackWaiters: make(map[uint32]chan uint32), inbound: make(map[uint32]*inboundMessage), delivered: make(map[uint32]uint32), @@ -151,6 +155,9 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) videoCodec: cfg.VideoCodec, videoTileModule: tileModule, videoTileRS: tileRS, + localRole: localFrameRole(cfg.DeviceID), + remoteRole: remoteFrameRole(cfg.DeviceID), + bindingToken: bindingToken(cfg.ChannelID), runCtx: ctx, } @@ -222,7 +229,7 @@ func (p *streamTransport) Send(data []byte) error { for range maxSendAttempts { for idx, fragment := range fragments { - frame := encodeDataFrame(seq, crc, len(data), idx, len(fragments), fragment) + frame := encodeDataFrameForBinding(p.localRole, p.bindingToken, seq, crc, len(data), idx, len(fragments), fragment) if err := p.enqueueFrame(frame, false); err != nil { return err } @@ -257,9 +264,10 @@ func (p *streamTransport) Close() error { p.encoderMu.Unlock() p.decoderMu.Lock() - if p.decoder != nil { - _ = p.decoder.Close() + for decoder := range p.decoders { + _ = decoder.Close() } + p.decoders = nil p.decoderMu.Unlock() if p.writerUp.Load() { @@ -445,8 +453,8 @@ func (p *streamTransport) enqueueFrame(frame []byte, priority bool) error { func (p *streamTransport) popDecoderFrames(decoder *ffmpegDecoder) { defer func() { p.decoderMu.Lock() - if p.decoder == decoder { - p.decoder = nil + if p.decoders != nil { + delete(p.decoders, decoder) } p.decoderMu.Unlock() _ = decoder.Close() @@ -511,15 +519,12 @@ func (p *streamTransport) handleRemoteTrack(track *webrtc.TrackRemote, _ *webrtc } p.decoderMu.Lock() - if p.closed.Load() { + if p.closed.Load() || p.decoders == nil { p.decoderMu.Unlock() _ = decoder.Close() return } - if p.decoder != nil { - _ = p.decoder.Close() - } - p.decoder = decoder + p.decoders[decoder] = struct{}{} p.decoderMu.Unlock() go p.popDecoderFrames(decoder) @@ -542,6 +547,9 @@ func (p *streamTransport) handleFrame(frame []byte) { logger.Debugf("videochannel decode transport frame error: %v", err) return } + if !p.acceptFrame(decoded) { + return + } switch decoded.typ { case frameTypeAck: @@ -620,7 +628,7 @@ func (p *streamTransport) handleInboundFrame(frame transportFrame) { } func (p *streamTransport) sendAck(seq, crc uint32) { - _ = p.enqueueFrame(encodeAckFrame(seq, crc), true) + _ = p.enqueueFrame(encodeAckFrameForBinding(p.localRole, p.bindingToken, seq, crc), true) } func (p *streamTransport) resolveAck(seq, crc uint32) { @@ -648,3 +656,31 @@ func randomID() string { } return hex.EncodeToString(b[:]) } + +func localFrameRole(deviceID string) byte { + if deviceID == "" { + return frameRoleServer + } + return frameRoleClient +} + +func remoteFrameRole(deviceID string) byte { + if deviceID == "" { + return frameRoleClient + } + return frameRoleServer +} + +func bindingToken(channelID string) uint32 { + token := crc32.ChecksumIEEE([]byte(channelID)) + if token == 0 && channelID != "" { + token = 1 + } + return token +} + +func (p *streamTransport) acceptFrame(frame transportFrame) bool { + roleOK := frame.role == frameRoleAny || frame.role == p.remoteRole + bindingOK := frame.binding == 0 || frame.binding == p.bindingToken + return roleOK && bindingOK +} diff --git a/internal/transport/videochannel/transport_test.go b/internal/transport/videochannel/transport_test.go index 83e0f57..ae5ace8 100644 --- a/internal/transport/videochannel/transport_test.go +++ b/internal/transport/videochannel/transport_test.go @@ -62,12 +62,13 @@ func TestTileIdleFrameIgnored(t *testing.T) { } func TestTransportFrameRoundTrip(t *testing.T) { - encoded := encodeDataFrame(42, 0xdeadbeef, 1024, 1, 3, []byte("chunk")) + encoded := encodeDataFrameForBinding(frameRoleClient, 0x12345678, 42, 0xdeadbeef, 1024, 1, 3, []byte("chunk")) decoded, err := decodeTransportFrame(encoded) if err != nil { t.Fatalf("decodeTransportFrame failed: %v", err) } - if decoded.typ != frameTypeData || decoded.seq != 42 || decoded.crc != 0xdeadbeef { + if decoded.typ != frameTypeData || decoded.role != frameRoleClient || + decoded.binding != 0x12345678 || decoded.seq != 42 || decoded.crc != 0xdeadbeef { t.Fatalf("unexpected frame header: %+v", decoded) } if decoded.totalLen != 1024 || decoded.fragIdx != 1 || decoded.fragTotal != 3 { @@ -77,3 +78,24 @@ func TestTransportFrameRoundTrip(t *testing.T) { t.Fatalf("payload mismatch: got=%q", decoded.payload) } } + +func TestAcceptFrameRole(t *testing.T) { + server := &streamTransport{remoteRole: frameRoleClient, bindingToken: 10} + if !server.acceptFrame(transportFrame{role: frameRoleClient, binding: 10}) { + t.Fatal("server rejected client frame") + } + if server.acceptFrame(transportFrame{role: frameRoleServer, binding: 10}) { + t.Fatal("server accepted server frame") + } + if server.acceptFrame(transportFrame{role: frameRoleClient, binding: 11}) { + t.Fatal("server accepted different binding") + } + + client := &streamTransport{remoteRole: frameRoleServer, bindingToken: 20} + if !client.acceptFrame(transportFrame{role: frameRoleServer, binding: 20}) { + t.Fatal("client rejected server frame") + } + if client.acceptFrame(transportFrame{role: frameRoleClient, binding: 20}) { + t.Fatal("client accepted client frame") + } +}