From 08a80a940067f9bf098f74b6209ae8f143a4fca1 Mon Sep 17 00:00:00 2001 From: zarazaex59 Date: Tue, 7 Apr 2026 00:10:04 +0300 Subject: [PATCH] feat(olcrtc): Add WebRTC tunneling with encryption and SOCKS5 proxy --- .gitignore | 3 +- cmd/olcrtc/main.go | 55 +++++++ go.mod | 31 +++- go.sum | 56 +++++++ internal/client/client.go | 214 ++++++++++++++++++++++++ internal/crypto/chacha.go | 48 ++++++ internal/mux/mux.go | 153 ++++++++++++++++++ internal/server/server.go | 157 ++++++++++++++++++ internal/telemost/api.go | 64 ++++++++ internal/telemost/peer.go | 333 ++++++++++++++++++++++++++++++++++++++ 10 files changed, 1112 insertions(+), 2 deletions(-) create mode 100644 cmd/olcrtc/main.go create mode 100644 go.sum create mode 100644 internal/client/client.go create mode 100644 internal/crypto/chacha.go create mode 100644 internal/mux/mux.go create mode 100644 internal/server/server.go create mode 100644 internal/telemost/api.go create mode 100644 internal/telemost/peer.go diff --git a/.gitignore b/.gitignore index 8cc4754..e049dc1 100644 --- a/.gitignore +++ b/.gitignore @@ -241,4 +241,5 @@ go.work.sum # Editor/IDE # .idea/ -# .vscode/ \ No newline at end of file +# .vscode/ +build/ \ No newline at end of file diff --git a/cmd/olcrtc/main.go b/cmd/olcrtc/main.go new file mode 100644 index 0000000..c39eba0 --- /dev/null +++ b/cmd/olcrtc/main.go @@ -0,0 +1,55 @@ +package main + +import ( + "flag" + "log" + + "github.com/zarazaex69/olcrtc/internal/client" + "github.com/zarazaex69/olcrtc/internal/server" +) + +func main() { + var ( + mode string + roomID string + provider string + socksPort int + keyHex string + debug bool + ) + + flag.StringVar(&mode, "mode", "", "Mode: srv or cnc") + flag.StringVar(&roomID, "id", "", "Telemost room ID") + flag.StringVar(&provider, "provider", "telemost", "Provider (telemost only)") + flag.IntVar(&socksPort, "socks-port", 1080, "SOCKS5 port (client only)") + flag.StringVar(&keyHex, "key", "", "Shared encryption key (hex)") + flag.BoolVar(&debug, "debug", false, "Enable debug logging") + flag.Parse() + + if !debug { + log.SetFlags(log.Ltime) + } + + if provider != "telemost" { + log.Fatal("Only telemost provider supported") + } + + if roomID == "" { + log.Fatal("Room ID required") + } + + roomURL := "https://telemost.yandex.ru/j/" + roomID + + switch mode { + case "srv": + if err := server.Run(roomURL, keyHex); err != nil { + log.Fatal(err) + } + case "cnc": + if err := client.Run(roomURL, keyHex, socksPort); err != nil { + log.Fatal(err) + } + default: + log.Fatal("Specify -mode srv or -mode cnc") + } +} diff --git a/go.mod b/go.mod index 8173784..8c5258e 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,32 @@ module github.com/zarazaex69/olcrtc -go 1.26.1 +go 1.26 + +require ( + github.com/google/uuid v1.6.0 + github.com/gorilla/websocket v1.5.3 + github.com/pion/webrtc/v4 v4.2.11 + golang.org/x/crypto v0.48.0 +) + +require ( + github.com/pion/datachannel v1.6.0 // indirect + github.com/pion/dtls/v3 v3.1.2 // indirect + github.com/pion/ice/v4 v4.2.2 // indirect + github.com/pion/interceptor v0.1.44 // indirect + github.com/pion/logging v0.2.4 // indirect + github.com/pion/mdns/v2 v2.1.0 // indirect + github.com/pion/randutil v0.1.0 // indirect + github.com/pion/rtcp v1.2.16 // indirect + github.com/pion/rtp v1.10.1 // indirect + github.com/pion/sctp v1.9.4 // indirect + github.com/pion/sdp/v3 v3.0.18 // indirect + github.com/pion/srtp/v3 v3.0.10 // indirect + github.com/pion/stun/v3 v3.1.1 // indirect + github.com/pion/transport/v4 v4.0.1 // indirect + github.com/pion/turn/v4 v4.1.4 // indirect + github.com/wlynxg/anet v0.0.5 // indirect + golang.org/x/net v0.50.0 // indirect + golang.org/x/sys v0.41.0 // indirect + golang.org/x/time v0.10.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..54f7f1c --- /dev/null +++ b/go.sum @@ -0,0 +1,56 @@ +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= +github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/pion/datachannel v1.6.0 h1:XecBlj+cvsxhAMZWFfFcPyUaDZtd7IJvrXqlXD/53i0= +github.com/pion/datachannel v1.6.0/go.mod h1:ur+wzYF8mWdC+Mkis5Thosk+u/VOL287apDNEbFpsIk= +github.com/pion/dtls/v3 v3.1.2 h1:gqEdOUXLtCGW+afsBLO0LtDD8GnuBBjEy6HRtyofZTc= +github.com/pion/dtls/v3 v3.1.2/go.mod h1:Hw/igcX4pdY69z1Hgv5x7wJFrUkdgHwAn/Q/uo7YHRo= +github.com/pion/ice/v4 v4.2.2 h1:dQJzzcgTFHDYyV3BoCfjPeX+JEtr58BWPi4PGyo6Vjg= +github.com/pion/ice/v4 v4.2.2/go.mod h1:2quLV1S5v1tAx3VvAJaH//KGitRXvo4RKlX6D3tnN+c= +github.com/pion/interceptor v0.1.44 h1:sNlZwM8dWXU9JQAkJh8xrarC0Etn8Oolcniukmuy0/I= +github.com/pion/interceptor v0.1.44/go.mod h1:4atVlBkcgXuUP+ykQF0qOCGU2j7pQzX2ofvPRFsY5RY= +github.com/pion/logging v0.2.4 h1:tTew+7cmQ+Mc1pTBLKH2puKsOvhm32dROumOZ655zB8= +github.com/pion/logging v0.2.4/go.mod h1:DffhXTKYdNZU+KtJ5pyQDjvOAh/GsNSyv1lbkFbe3so= +github.com/pion/mdns/v2 v2.1.0 h1:3IJ9+Xio6tWYjhN6WwuY142P/1jA0D5ERaIqawg/fOY= +github.com/pion/mdns/v2 v2.1.0/go.mod h1:pcez23GdynwcfRU1977qKU0mDxSeucttSHbCSfFOd9A= +github.com/pion/randutil v0.1.0 h1:CFG1UdESneORglEsnimhUjf33Rwjubwj6xfiOXBa3mA= +github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TBZq+j8= +github.com/pion/rtcp v1.2.16 h1:fk1B1dNW4hsI78XUCljZJlC4kZOPk67mNRuQ0fcEkSo= +github.com/pion/rtcp v1.2.16/go.mod h1:/as7VKfYbs5NIb4h6muQ35kQF/J0ZVNz2Z3xKoCBYOo= +github.com/pion/rtp v1.10.1 h1:xP1prZcCTUuhO2c83XtxyOHJteISg6o8iPsE2acaMtA= +github.com/pion/rtp v1.10.1/go.mod h1:rF5nS1GqbR7H/TCpKwylzeq6yDM+MM6k+On5EgeThEM= +github.com/pion/sctp v1.9.4 h1:cMxEu0F5tbP4qH07bKf1Zjf4rUih9LIo0qQt424e258= +github.com/pion/sctp v1.9.4/go.mod h1:N20Dq6LY+JvJDAh9VVh1JELngb2rQ8dPgds5yBWiPgw= +github.com/pion/sdp/v3 v3.0.18 h1:l0bAXazKHpepazVdp+tPYnrsy9dfh7ZbT8DxesH5ZnI= +github.com/pion/sdp/v3 v3.0.18/go.mod h1:ZREGo6A9ZygQ9XkqAj5xYCQtQpif0i6Pa81HOiAdqQ8= +github.com/pion/srtp/v3 v3.0.10 h1:tFirkpBb3XccP5VEXLi50GqXhv5SKPxqrdlhDCJlZrQ= +github.com/pion/srtp/v3 v3.0.10/go.mod h1:3mOTIB0cq9qlbn59V4ozvv9ClW/BSEbRp4cY0VtaR7M= +github.com/pion/stun/v3 v3.1.1 h1:CkQxveJ4xGQjulGSROXbXq94TAWu8gIX2dT+ePhUkqw= +github.com/pion/stun/v3 v3.1.1/go.mod h1:qC1DfmcCTQjl9PBaMa5wSn3x9IPmKxSdcCsxBcDBndM= +github.com/pion/transport/v3 v3.1.1 h1:Tr684+fnnKlhPceU+ICdrw6KKkTms+5qHMgw6bIkYOM= +github.com/pion/transport/v3 v3.1.1/go.mod h1:+c2eewC5WJQHiAA46fkMMzoYZSuGzA/7E2FPrOYHctQ= +github.com/pion/transport/v4 v4.0.1 h1:sdROELU6BZ63Ab7FrOLn13M6YdJLY20wldXW2Cu2k8o= +github.com/pion/transport/v4 v4.0.1/go.mod h1:nEuEA4AD5lPdcIegQDpVLgNoDGreqM/YqmEx3ovP4jM= +github.com/pion/turn/v4 v4.1.4 h1:EU11yMXKIsK43FhcUnjLlrhE4nboHZq+TXBIi3QpcxQ= +github.com/pion/turn/v4 v4.1.4/go.mod h1:ES1DXVFKnOhuDkqn9hn5VJlSWmZPaRJLyBXoOeO/BmQ= +github.com/pion/webrtc/v4 v4.2.11 h1:QUX1QZKlNIn4O7U5JxLPGP0sV5RTncZkzu9SPR3jVNU= +github.com/pion/webrtc/v4 v4.2.11/go.mod h1:s/rAiyy77GyRFrZMx+Ls6aua26dIBPudH8/ZHYbIRWY= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U= +github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U= +github.com/wlynxg/anet v0.0.5 h1:J3VJGi1gvo0JwZ/P1/Yc/8p63SoW98B5dHkYDmpgvvU= +github.com/wlynxg/anet v0.0.5/go.mod h1:eay5PRQr7fIVAMbTbchTnO9gG65Hg/uYGdc7mguHxoA= +golang.org/x/crypto v0.48.0 h1:/VRzVqiRSggnhY7gNRxPauEQ5Drw9haKdM0jqfcCFts= +golang.org/x/crypto v0.48.0/go.mod h1:r0kV5h3qnFPlQnBSrULhlsRfryS2pmewsg+XfMgkVos= +golang.org/x/net v0.50.0 h1:ucWh9eiCGyDR3vtzso0WMQinm2Dnt8cFMuQa9K33J60= +golang.org/x/net v0.50.0/go.mod h1:UgoSli3F/pBgdJBHCTc+tp3gmrU4XswgGRgtnwWTfyM= +golang.org/x/sys v0.41.0 h1:Ivj+2Cp/ylzLiEU89QhWblYnOE9zerudt9Ftecq2C6k= +golang.org/x/sys v0.41.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/time v0.10.0 h1:3usCWA8tQn0L8+hFJQNgzpWbd89begxN66o1Ojdn5L4= +golang.org/x/time v0.10.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/internal/client/client.go b/internal/client/client.go new file mode 100644 index 0000000..e1df9a1 --- /dev/null +++ b/internal/client/client.go @@ -0,0 +1,214 @@ +package client + +import ( + "context" + "crypto/rand" + "encoding/binary" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "log" + "net" + "time" + + "github.com/zarazaex69/olcrtc/internal/crypto" + "github.com/zarazaex69/olcrtc/internal/mux" + "github.com/zarazaex69/olcrtc/internal/telemost" +) + +type Client struct { + peer *telemost.Peer + cipher *crypto.Cipher + mux *mux.Multiplexer +} + +func Run(roomURL, keyHex string, socksPort int) error { + var key []byte + var err error + + if keyHex == "" { + key = make([]byte, 32) + if _, err := rand.Read(key); err != nil { + return err + } + log.Printf("Generated key: %x", key) + } else { + key, err = hex.DecodeString(keyHex) + if err != nil { + return err + } + } + + cipher, err := crypto.NewCipher(string(key)) + if err != nil { + return err + } + + c := &Client{ + cipher: cipher, + } + + c.mux = mux.New(func(frame []byte) error { + encrypted, err := c.cipher.Encrypt(frame) + if err != nil { + return err + } + return c.peer.Send(encrypted) + }) + + peer, err := telemost.NewPeer(roomURL, "OlcRTC-Client", c.onData) + if err != nil { + return err + } + c.peer = peer + + log.Println("Connecting to Telemost...") + if err := peer.Connect(context.Background()); err != nil { + return err + } + log.Println("Connected to Telemost") + + return c.runSOCKS5(socksPort) +} + +func (c *Client) onData(data []byte) { + plaintext, err := c.cipher.Decrypt(data) + if err != nil { + return + } + + c.mux.HandleFrame(plaintext) +} + +func (c *Client) runSOCKS5(port int) error { + listener, err := net.Listen("tcp", fmt.Sprintf("127.0.0.1:%d", port)) + if err != nil { + return err + } + defer listener.Close() + + log.Printf("SOCKS5 proxy listening on 127.0.0.1:%d", port) + + for { + conn, err := listener.Accept() + if err != nil { + log.Printf("Accept error: %v", err) + continue + } + + go c.handleSOCKS5(conn) + } +} + +func (c *Client) handleSOCKS5(conn net.Conn) { + defer conn.Close() + + buf := make([]byte, 256) + + if _, err := io.ReadFull(conn, buf[:2]); err != nil { + return + } + + if buf[0] != 5 { + return + } + + nmethods := buf[1] + if _, err := io.ReadFull(conn, buf[:nmethods]); err != nil { + return + } + + conn.Write([]byte{5, 0}) + + if _, err := io.ReadFull(conn, buf[:4]); err != nil { + return + } + + if buf[1] != 1 { + conn.Write([]byte{5, 7, 0, 1, 0, 0, 0, 0, 0, 0}) + return + } + + var addr string + atyp := buf[3] + + switch atyp { + case 1: + if _, err := io.ReadFull(conn, buf[:4]); err != nil { + return + } + addr = fmt.Sprintf("%d.%d.%d.%d", buf[0], buf[1], buf[2], buf[3]) + case 3: + if _, err := io.ReadFull(conn, buf[:1]); err != nil { + return + } + length := buf[0] + if _, err := io.ReadFull(conn, buf[:length]); err != nil { + return + } + addr = string(buf[:length]) + default: + conn.Write([]byte{5, 8, 0, 1, 0, 0, 0, 0, 0, 0}) + return + } + + if _, err := io.ReadFull(conn, buf[:2]); err != nil { + return + } + port := binary.BigEndian.Uint16(buf[:2]) + + sid := c.mux.OpenStream() + log.Printf("SOCKS5 connect sid=%d %s:%d", sid, addr, port) + + req := map[string]interface{}{ + "cmd": "connect", + "addr": addr, + "port": port, + } + + reqData, _ := json.Marshal(req) + c.mux.SendData(sid, reqData) + + time.Sleep(500 * time.Millisecond) + + conn.Write([]byte{5, 0, 0, 1, 0, 0, 0, 0, 0, 0}) + + done := make(chan struct{}) + + go func() { + defer close(done) + buf := make([]byte, 4096) + for { + n, err := conn.Read(buf) + if err != nil { + c.mux.CloseStream(sid) + return + } + c.mux.SendData(sid, buf[:n]) + } + }() + + go func() { + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-done: + return + case <-ticker.C: + data := c.mux.ReadStream(sid) + if len(data) > 0 { + conn.Write(data) + } + + if c.mux.StreamClosed(sid) { + return + } + } + } + }() + + <-done +} diff --git a/internal/crypto/chacha.go b/internal/crypto/chacha.go new file mode 100644 index 0000000..b9af70e --- /dev/null +++ b/internal/crypto/chacha.go @@ -0,0 +1,48 @@ +package crypto + +import ( + "crypto/cipher" + "crypto/rand" + "errors" + + "golang.org/x/crypto/chacha20poly1305" +) + +type Cipher struct { + aead cipher.AEAD +} + +func NewCipher(keyStr string) (*Cipher, error) { + key := []byte(keyStr) + if len(key) != chacha20poly1305.KeySize { + return nil, errors.New("invalid key size") + } + + aead, err := chacha20poly1305.NewX(key) + if err != nil { + return nil, err + } + + return &Cipher{aead: aead}, nil +} + +func (c *Cipher) Encrypt(plaintext []byte) ([]byte, error) { + nonce := make([]byte, c.aead.NonceSize()) + if _, err := rand.Read(nonce); err != nil { + return nil, err + } + + ciphertext := c.aead.Seal(nonce, nonce, plaintext, nil) + return ciphertext, nil +} + +func (c *Cipher) Decrypt(ciphertext []byte) ([]byte, error) { + if len(ciphertext) < c.aead.NonceSize() { + return nil, errors.New("ciphertext too short") + } + + nonce := ciphertext[:c.aead.NonceSize()] + encrypted := ciphertext[c.aead.NonceSize():] + + return c.aead.Open(nil, nonce, encrypted, nil) +} diff --git a/internal/mux/mux.go b/internal/mux/mux.go new file mode 100644 index 0000000..a6debe1 --- /dev/null +++ b/internal/mux/mux.go @@ -0,0 +1,153 @@ +package mux + +import ( + "encoding/binary" + "sync" +) + +type Stream struct { + ID uint16 + recvBuf []byte + closed bool + mu sync.Mutex +} + +type Multiplexer struct { + streams map[uint16]*Stream + nextID uint16 + onSend func([]byte) error + mu sync.RWMutex +} + +func New(onSend func([]byte) error) *Multiplexer { + return &Multiplexer{ + streams: make(map[uint16]*Stream), + nextID: 1, + onSend: onSend, + } +} + +func (m *Multiplexer) OpenStream() uint16 { + m.mu.Lock() + defer m.mu.Unlock() + + sid := m.nextID + m.nextID++ + + m.streams[sid] = &Stream{ + ID: sid, + recvBuf: make([]byte, 0), + } + + return sid +} + +func (m *Multiplexer) SendData(sid uint16, data []byte) error { + m.mu.RLock() + stream, exists := m.streams[sid] + m.mu.RUnlock() + + if !exists || stream.closed { + return nil + } + + const chunkSize = 7168 + for i := 0; i < len(data); i += chunkSize { + end := i + chunkSize + if end > len(data) { + end = len(data) + } + + chunk := data[i:end] + frame := make([]byte, 4+len(chunk)) + binary.BigEndian.PutUint16(frame[0:2], sid) + binary.BigEndian.PutUint16(frame[2:4], uint16(len(chunk))) + copy(frame[4:], chunk) + + if err := m.onSend(frame); err != nil { + return err + } + } + + return nil +} + +func (m *Multiplexer) CloseStream(sid uint16) error { + m.mu.Lock() + defer m.mu.Unlock() + + if stream, exists := m.streams[sid]; exists { + stream.closed = true + } + + frame := make([]byte, 4) + binary.BigEndian.PutUint16(frame[0:2], sid) + binary.BigEndian.PutUint16(frame[2:4], 0) + + return m.onSend(frame) +} + +func (m *Multiplexer) HandleFrame(frame []byte) { + if len(frame) < 4 { + return + } + + sid := binary.BigEndian.Uint16(frame[0:2]) + length := binary.BigEndian.Uint16(frame[2:4]) + + if length == 0 { + m.mu.Lock() + if stream, exists := m.streams[sid]; exists { + stream.closed = true + } + m.mu.Unlock() + return + } + + data := frame[4 : 4+length] + + m.mu.Lock() + stream, exists := m.streams[sid] + if !exists { + stream = &Stream{ + ID: sid, + recvBuf: make([]byte, 0), + } + m.streams[sid] = stream + } + stream.recvBuf = append(stream.recvBuf, data...) + m.mu.Unlock() +} + +func (m *Multiplexer) ReadStream(sid uint16) []byte { + m.mu.Lock() + defer m.mu.Unlock() + + stream, exists := m.streams[sid] + if !exists || len(stream.recvBuf) == 0 { + return nil + } + + data := stream.recvBuf + stream.recvBuf = make([]byte, 0) + return data +} + +func (m *Multiplexer) StreamClosed(sid uint16) bool { + m.mu.RLock() + defer m.mu.RUnlock() + + stream, exists := m.streams[sid] + return !exists || stream.closed +} + +func (m *Multiplexer) GetStreams() []uint16 { + m.mu.RLock() + defer m.mu.RUnlock() + + sids := make([]uint16, 0, len(m.streams)) + for sid := range m.streams { + sids = append(sids, sid) + } + return sids +} diff --git a/internal/server/server.go b/internal/server/server.go new file mode 100644 index 0000000..dfa3cc3 --- /dev/null +++ b/internal/server/server.go @@ -0,0 +1,157 @@ +package server + +import ( + "context" + "crypto/rand" + "encoding/hex" + "encoding/json" + "fmt" + "io" + "log" + "net" + "time" + + "github.com/zarazaex69/olcrtc/internal/crypto" + "github.com/zarazaex69/olcrtc/internal/mux" + "github.com/zarazaex69/olcrtc/internal/telemost" +) + +type Server struct { + peer *telemost.Peer + cipher *crypto.Cipher + mux *mux.Multiplexer + connections map[uint16]net.Conn +} + +type ConnectRequest struct { + Cmd string `json:"cmd"` + Addr string `json:"addr"` + Port int `json:"port"` +} + +func Run(roomURL, keyHex string) error { + var key []byte + var err error + + if keyHex == "" { + key = make([]byte, 32) + if _, err := rand.Read(key); err != nil { + return err + } + log.Printf("Generated key: %x", key) + } else { + key, err = hex.DecodeString(keyHex) + if err != nil { + return err + } + } + + cipher, err := crypto.NewCipher(string(key)) + if err != nil { + return err + } + + s := &Server{ + cipher: cipher, + connections: make(map[uint16]net.Conn), + } + + s.mux = mux.New(func(frame []byte) error { + encrypted, err := s.cipher.Encrypt(frame) + if err != nil { + return err + } + return s.peer.Send(encrypted) + }) + + peer, err := telemost.NewPeer(roomURL, "OlcRTC-Server", s.onData) + if err != nil { + return err + } + s.peer = peer + + log.Println("Connecting to Telemost...") + if err := peer.Connect(context.Background()); err != nil { + return err + } + log.Println("Connected to Telemost") + + return s.run() +} + +func (s *Server) onData(data []byte) { + plaintext, err := s.cipher.Decrypt(data) + if err != nil { + return + } + + s.mux.HandleFrame(plaintext) +} + +func (s *Server) run() error { + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + + for range ticker.C { + for _, sid := range s.mux.GetStreams() { + data := s.mux.ReadStream(sid) + if len(data) > 0 { + if conn, exists := s.connections[sid]; exists { + if _, err := conn.Write(data); err != nil { + s.mux.CloseStream(sid) + conn.Close() + delete(s.connections, sid) + } + } else { + var req ConnectRequest + if err := json.Unmarshal(data, &req); err == nil && req.Cmd == "connect" { + go s.handleConnect(sid, req) + } + } + } + + if s.mux.StreamClosed(sid) { + if conn, exists := s.connections[sid]; exists { + conn.Close() + delete(s.connections, sid) + } + } + } + } + + return nil +} + +func (s *Server) handleConnect(sid uint16, req ConnectRequest) { + addr := fmt.Sprintf("%s:%d", req.Addr, req.Port) + log.Printf("Connecting sid=%d to %s", sid, addr) + + conn, err := net.DialTimeout("tcp", addr, 10*time.Second) + if err != nil { + log.Printf("Connect failed sid=%d: %v", sid, err) + s.mux.CloseStream(sid) + return + } + + s.connections[sid] = conn + log.Printf("Connected sid=%d", sid) + + go func() { + buf := make([]byte, 4096) + for { + n, err := conn.Read(buf) + if err != nil { + if err != io.EOF { + log.Printf("Read error sid=%d: %v", sid, err) + } + s.mux.CloseStream(sid) + return + } + + if err := s.mux.SendData(sid, buf[:n]); err != nil { + log.Printf("Send error sid=%d: %v", sid, err) + return + } + } + }() +} diff --git a/internal/telemost/api.go b/internal/telemost/api.go new file mode 100644 index 0000000..3c02a74 --- /dev/null +++ b/internal/telemost/api.go @@ -0,0 +1,64 @@ +package telemost + +import ( + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + + "github.com/google/uuid" +) + +const apiBase = "https://cloud-api.yandex.ru/telemost_front/v2/telemost" + +type ConnectionInfo struct { + RoomID string `json:"room_id"` + PeerID string `json:"peer_id"` + Credentials string `json:"credentials"` + ClientConfig struct { + MediaServerURL string `json:"media_server_url"` + } `json:"client_configuration"` +} + +func GetConnectionInfo(roomURL, displayName string) (*ConnectionInfo, error) { + u := fmt.Sprintf("%s/conferences/%s/connection", apiBase, url.QueryEscape(roomURL)) + + req, err := http.NewRequest("GET", u, nil) + if err != nil { + return nil, err + } + + q := req.URL.Query() + q.Add("next_gen_media_platform_allowed", "true") + q.Add("display_name", displayName) + q.Add("waiting_room_supported", "true") + req.URL.RawQuery = q.Encode() + + req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0") + req.Header.Set("Accept", "*/*") + req.Header.Set("Content-Type", "application/json") + req.Header.Set("Client-Instance-Id", uuid.New().String()) + req.Header.Set("X-Telemost-Client-Version", "187.1.0") + req.Header.Set("Idempotency-Key", uuid.New().String()) + req.Header.Set("Origin", "https://telemost.yandex.ru") + req.Header.Set("Referer", "https://telemost.yandex.ru/") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + return nil, fmt.Errorf("API error %d: %s", resp.StatusCode, body) + } + + var info ConnectionInfo + if err := json.NewDecoder(resp.Body).Decode(&info); err != nil { + return nil, err + } + + return &info, nil +} diff --git a/internal/telemost/peer.go b/internal/telemost/peer.go new file mode 100644 index 0000000..dfbcef9 --- /dev/null +++ b/internal/telemost/peer.go @@ -0,0 +1,333 @@ +package telemost + +import ( + "context" + "fmt" + "log" + "strings" + "time" + + "github.com/google/uuid" + "github.com/gorilla/websocket" + "github.com/pion/webrtc/v4" +) + +type Peer struct { + roomURL string + name string + conn *ConnectionInfo + ws *websocket.Conn + pcSub *webrtc.PeerConnection + pcPub *webrtc.PeerConnection + dc *webrtc.DataChannel + onData func([]byte) +} + +func NewPeer(roomURL, name string, onData func([]byte)) (*Peer, error) { + conn, err := GetConnectionInfo(roomURL, name) + if err != nil { + return nil, err + } + + return &Peer{ + roomURL: roomURL, + name: name, + conn: conn, + onData: onData, + }, nil +} + +func (p *Peer) Connect(ctx context.Context) error { + config := webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{ + {URLs: []string{"stun:stun.rtc.yandex.net:3478"}}, + }, + SDPSemantics: webrtc.SDPSemanticsUnifiedPlan, + } + + settingEngine := webrtc.SettingEngine{} + api := webrtc.NewAPI(webrtc.WithSettingEngine(settingEngine)) + + var err error + p.pcSub, err = api.NewPeerConnection(config) + if err != nil { + return err + } + + p.pcPub, err = api.NewPeerConnection(config) + if err != nil { + return err + } + + p.dc, err = p.pcPub.CreateDataChannel("olcrtc", nil) + if err != nil { + return err + } + + dcReady := make(chan struct{}) + p.dc.OnOpen(func() { + close(dcReady) + }) + + p.dc.OnMessage(func(msg webrtc.DataChannelMessage) { + if p.onData != nil && len(msg.Data) > 0 { + p.onData(msg.Data) + } + }) + + p.pcSub.OnDataChannel(func(dc *webrtc.DataChannel) { + log.Printf("Received datachannel: %s", dc.Label()) + dc.OnMessage(func(msg webrtc.DataChannelMessage) { + if p.onData != nil && len(msg.Data) > 0 { + p.onData(msg.Data) + } + }) + }) + + ws, _, err := websocket.DefaultDialer.Dial(p.conn.ClientConfig.MediaServerURL, nil) + if err != nil { + return err + } + p.ws = ws + + if err := p.sendHello(); err != nil { + return err + } + + p.setupICEHandlers() + + go p.handleSignaling() + + select { + case <-dcReady: + return nil + case <-time.After(15 * time.Second): + return fmt.Errorf("datachannel timeout") + case <-ctx.Done(): + return ctx.Err() + } +} + +func (p *Peer) Send(data []byte) error { + return p.dc.Send(data) +} + +func (p *Peer) sendHello() error { + hello := map[string]interface{}{ + "uid": uuid.New().String(), + "hello": map[string]interface{}{ + "participantMeta": map[string]interface{}{ + "name": p.name, + "role": "SPEAKER", + "sendAudio": false, + "sendVideo": false, + }, + "participantAttributes": map[string]interface{}{ + "name": p.name, + "role": "SPEAKER", + }, + "sendAudio": false, + "sendVideo": false, + "sendSharing": false, + "participantId": p.conn.PeerID, + "roomId": p.conn.RoomID, + "serviceName": "telemost", + "credentials": p.conn.Credentials, + "capabilitiesOffer": map[string]interface{}{ + "offerAnswerMode": []string{"SEPARATE"}, + "initialSubscriberOffer": []string{"ON_HELLO"}, + "slotsMode": []string{"FROM_CONTROLLER"}, + "simulcastMode": []string{"DISABLED"}, + "selfVadStatus": []string{"FROM_SERVER"}, + "dataChannelSharing": []string{"TO_RTP"}, + }, + "sdkInfo": map[string]interface{}{ + "implementation": "go", + "version": "1.0.0", + "userAgent": "OlcRTC-" + p.name, + }, + "sdkInitializationId": uuid.New().String(), + "disablePublisher": false, + "disableSubscriber": false, + }, + } + + return p.ws.WriteJSON(hello) +} + +func (p *Peer) handleSignaling() { + pubSent := false + + for { + var msg map[string]interface{} + if err := p.ws.ReadJSON(&msg); err != nil { + log.Printf("WS read error: %v", err) + return + } + + uid, _ := msg["uid"].(string) + + if _, ok := msg["serverHello"]; ok { + p.sendAck(uid) + } + + if offer, ok := msg["subscriberSdpOffer"].(map[string]interface{}); ok && !pubSent { + sdp, _ := offer["sdp"].(string) + pcSeq, _ := offer["pcSeq"].(float64) + + if err := p.pcSub.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeOffer, + SDP: sdp, + }); err != nil { + log.Printf("SetRemoteDescription error: %v", err) + continue + } + + answer, err := p.pcSub.CreateAnswer(nil) + if err != nil { + log.Printf("CreateAnswer error: %v", err) + continue + } + + if err := p.pcSub.SetLocalDescription(answer); err != nil { + log.Printf("SetLocalDescription error: %v", err) + continue + } + + p.ws.WriteJSON(map[string]interface{}{ + "uid": uuid.New().String(), + "subscriberSdpAnswer": map[string]interface{}{ + "pcSeq": int(pcSeq), + "sdp": answer.SDP, + }, + }) + + p.sendAck(uid) + time.Sleep(300 * time.Millisecond) + + pubOffer, err := p.pcPub.CreateOffer(nil) + if err != nil { + log.Printf("CreateOffer error: %v", err) + continue + } + + if err := p.pcPub.SetLocalDescription(pubOffer); err != nil { + log.Printf("SetLocalDescription error: %v", err) + continue + } + + p.ws.WriteJSON(map[string]interface{}{ + "uid": uuid.New().String(), + "publisherSdpOffer": map[string]interface{}{ + "pcSeq": 1, + "sdp": pubOffer.SDP, + }, + }) + + pubSent = true + } + + if answer, ok := msg["publisherSdpAnswer"].(map[string]interface{}); ok { + sdp, _ := answer["sdp"].(string) + + if err := p.pcPub.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, + SDP: sdp, + }); err != nil { + log.Printf("SetRemoteDescription error: %v", err) + } + + p.sendAck(uid) + } + + if cand, ok := msg["webrtcIceCandidate"].(map[string]interface{}); ok { + p.handleICE(cand) + } + } +} + +func (p *Peer) handleICE(cand map[string]interface{}) { + candStr, _ := cand["candidate"].(string) + target, _ := cand["target"].(string) + sdpMid, _ := cand["sdpMid"].(string) + sdpMLineIndex, _ := cand["sdpMlineIndex"].(float64) + + parts := strings.Fields(candStr) + if len(parts) < 8 { + return + } + + init := webrtc.ICECandidateInit{ + Candidate: candStr, + SDPMid: &sdpMid, + SDPMLineIndex: func() *uint16 { v := uint16(sdpMLineIndex); return &v }(), + } + + if target == "SUBSCRIBER" { + p.pcSub.AddICECandidate(init) + } else if target == "PUBLISHER" { + p.pcPub.AddICECandidate(init) + } +} + +func (p *Peer) sendAck(uid string) { + p.ws.WriteJSON(map[string]interface{}{ + "uid": uid, + "ack": map[string]interface{}{ + "status": map[string]interface{}{ + "code": "OK", + }, + }, + }) +} + +func (p *Peer) setupICEHandlers() { + p.pcSub.OnICECandidate(func(c *webrtc.ICECandidate) { + if c == nil { + return + } + + init := c.ToJSON() + p.ws.WriteJSON(map[string]interface{}{ + "uid": uuid.New().String(), + "webrtcIceCandidate": map[string]interface{}{ + "candidate": init.Candidate, + "sdpMid": init.SDPMid, + "sdpMlineIndex": init.SDPMLineIndex, + "target": "SUBSCRIBER", + "pcSeq": 1, + }, + }) + }) + + p.pcPub.OnICECandidate(func(c *webrtc.ICECandidate) { + if c == nil { + return + } + + init := c.ToJSON() + p.ws.WriteJSON(map[string]interface{}{ + "uid": uuid.New().String(), + "webrtcIceCandidate": map[string]interface{}{ + "candidate": init.Candidate, + "sdpMid": init.SDPMid, + "sdpMlineIndex": init.SDPMLineIndex, + "target": "PUBLISHER", + "pcSeq": 1, + }, + }) + }) +} + +func (p *Peer) Close() error { + if p.ws != nil { + p.ws.Close() + } + if p.pcSub != nil { + p.pcSub.Close() + } + if p.pcPub != nil { + p.pcPub.Close() + } + return nil +}