From cb78320aaa851a43ef660fc8606a7b3a213e6c91 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Thu, 9 Apr 2026 19:07:12 +0300 Subject: [PATCH] feat(client,server): Add dual channel mode for 2x throughput --- cmd/olcrtc/main.go | 6 ++- internal/client/client.go | 75 +++++++++++++++++++------------ internal/server/server.go | 93 +++++++++++++++++++++++---------------- 3 files changed, 104 insertions(+), 70 deletions(-) diff --git a/cmd/olcrtc/main.go b/cmd/olcrtc/main.go index 3063e80..7073785 100644 --- a/cmd/olcrtc/main.go +++ b/cmd/olcrtc/main.go @@ -24,6 +24,7 @@ func main() { keyHex string debug bool dataDir string + duo bool ) flag.StringVar(&mode, "mode", "", "Mode: srv or cnc") @@ -33,6 +34,7 @@ func main() { flag.StringVar(&keyHex, "key", "", "Shared encryption key (hex)") flag.BoolVar(&debug, "debug", false, "Enable verbose logging") flag.StringVar(&dataDir, "data", "data", "Path to data directory") + flag.BoolVar(&duo, "duo", false, "Use dual channels for 2x throughput") flag.Parse() if debug { @@ -74,9 +76,9 @@ func main() { go func() { switch mode { case "srv": - errCh <- server.Run(ctx, roomURL, keyHex) + errCh <- server.Run(ctx, roomURL, keyHex, duo) case "cnc": - errCh <- client.Run(ctx, roomURL, keyHex, socksPort) + errCh <- client.Run(ctx, roomURL, keyHex, socksPort, duo) } }() diff --git a/internal/client/client.go b/internal/client/client.go index c1df820..dea7a1c 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -10,6 +10,7 @@ import ( "io" "log" "net" + "sync/atomic" "time" "github.com/pion/webrtc/v4" @@ -21,13 +22,14 @@ import ( ) type Client struct { - peer *telemost.Peer + peers []*telemost.Peer cipher *crypto.Cipher mux *mux.Multiplexer clientID uint32 + peerIdx atomic.Uint32 } -func Run(ctx context.Context, roomURL, keyHex string, socksPort int) error { +func Run(ctx context.Context, roomURL, keyHex string, socksPort int, duo bool) error { var key []byte var err error @@ -62,6 +64,13 @@ func Run(ctx context.Context, roomURL, keyHex string, socksPort int) error { c := &Client{ cipher: cipher, clientID: clientID, + peers: make([]*telemost.Peer, 0), + } + + peerCount := 1 + if duo { + peerCount = 2 + log.Println("Duo mode: using 2 parallel channels") } c.mux = mux.New(c.clientID, func(frame []byte) error { @@ -69,36 +78,43 @@ func Run(ctx context.Context, roomURL, keyHex string, socksPort int) error { if err != nil { return err } - return c.peer.Send(encrypted) + + idx := c.peerIdx.Add(1) % uint32(len(c.peers)) + return c.peers[idx].Send(encrypted) }) - peer, err := telemost.NewPeer(roomURL, names.Generate(), c.onData) - if err != nil { - return err - } - c.peer = peer + for i := 0; i < peerCount; i++ { + peer, err := telemost.NewPeer(roomURL, names.Generate(), c.onData) + if err != nil { + return err + } + c.peers = append(c.peers, peer) - peer.SetReconnectCallback(func(dc *webrtc.DataChannel) { - log.Println("Client reconnected - resetting multiplexer state") - - c.mux.UpdateSendFunc(func(frame []byte) error { - encrypted, err := c.cipher.Encrypt(frame) - if err != nil { - return err - } - return c.peer.Send(encrypted) + peer.SetReconnectCallback(func(dc *webrtc.DataChannel) { + log.Printf("Client peer %d reconnected - resetting multiplexer state", i) + + c.mux.UpdateSendFunc(func(frame []byte) error { + encrypted, err := c.cipher.Encrypt(frame) + if err != nil { + return err + } + idx := c.peerIdx.Add(1) % uint32(len(c.peers)) + return c.peers[idx].Send(encrypted) + }) + + c.mux.Reset() + + log.Println("Client multiplexer reset complete") }) - - c.mux.Reset() - - log.Println("Client multiplexer reset complete") - }) - log.Println("Connecting to Telemost...") - if err := peer.Connect(ctx); err != nil { - return err + log.Printf("Connecting peer %d to Telemost...", i) + if err := peer.Connect(ctx); err != nil { + return err + } + log.Printf("Peer %d connected", i) + + go peer.WatchConnection(ctx) } - log.Println("Connected to Telemost") time.Sleep(100 * time.Millisecond) @@ -107,11 +123,12 @@ func Run(ctx context.Context, roomURL, keyHex string, socksPort int) error { binary.BigEndian.PutUint16(resetFrame[4:6], 0xFFFF) binary.BigEndian.PutUint16(resetFrame[6:8], 0xFFFF) encrypted, _ := cipher.Encrypt(resetFrame) - peer.Send(encrypted) + + for _, peer := range c.peers { + peer.Send(encrypted) + } log.Printf("Sent reset signal to server (clientID=%d)", c.clientID) - go peer.WatchConnection(ctx) - return c.runSOCKS5(ctx, socksPort) } diff --git a/internal/server/server.go b/internal/server/server.go index a10990f..12f262b 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -10,6 +10,7 @@ import ( "log" "net" "sync" + "sync/atomic" "time" "github.com/pion/webrtc/v4" @@ -21,11 +22,12 @@ import ( ) type Server struct { - peer *telemost.Peer + peers []*telemost.Peer cipher *crypto.Cipher mux *mux.Multiplexer connections map[uint16]net.Conn connMu sync.RWMutex + peerIdx atomic.Uint32 } type ConnectRequest struct { @@ -34,7 +36,7 @@ type ConnectRequest struct { Port int `json:"port"` } -func Run(ctx context.Context, roomURL, keyHex string) error { +func Run(ctx context.Context, roomURL, keyHex string, duo bool) error { var key []byte var err error @@ -67,6 +69,13 @@ func Run(ctx context.Context, roomURL, keyHex string) error { s := &Server{ cipher: cipher, connections: make(map[uint16]net.Conn), + peers: make([]*telemost.Peer, 0), + } + + peerCount := 1 + if duo { + peerCount = 2 + log.Println("Duo mode: using 2 parallel channels") } s.mux = mux.New(0, func(frame []byte) error { @@ -74,49 +83,53 @@ func Run(ctx context.Context, roomURL, keyHex string) error { if err != nil { return err } - return s.peer.Send(encrypted) + idx := s.peerIdx.Add(1) % uint32(len(s.peers)) + return s.peers[idx].Send(encrypted) }) - peer, err := telemost.NewPeer(roomURL, names.Generate(), s.onData) - if err != nil { - return err - } - s.peer = peer - - peer.SetReconnectCallback(func(dc *webrtc.DataChannel) { - log.Println("Server reconnected - resetting multiplexer state") - - s.connMu.Lock() - for sid, conn := range s.connections { - if conn != nil { - conn.Close() - } - delete(s.connections, sid) + for i := 0; i < peerCount; i++ { + peer, err := telemost.NewPeer(roomURL, names.Generate(), s.onData) + if err != nil { + return err } - s.connMu.Unlock() - - if dc != nil { - s.mux.UpdateSendFunc(func(frame []byte) error { - encrypted, err := s.cipher.Encrypt(frame) - if err != nil { - return err + s.peers = append(s.peers, peer) + + peer.SetReconnectCallback(func(dc *webrtc.DataChannel) { + log.Printf("Server peer %d reconnected - resetting multiplexer state", i) + + s.connMu.Lock() + for sid, conn := range s.connections { + if conn != nil { + conn.Close() } - return s.peer.Send(encrypted) - }) + delete(s.connections, sid) + } + s.connMu.Unlock() + + if dc != nil { + s.mux.UpdateSendFunc(func(frame []byte) error { + encrypted, err := s.cipher.Encrypt(frame) + if err != nil { + return err + } + idx := s.peerIdx.Add(1) % uint32(len(s.peers)) + return s.peers[idx].Send(encrypted) + }) + } + + s.mux.Reset() + + log.Println("Server multiplexer reset complete") + }) + + log.Printf("Connecting peer %d to Telemost...", i) + if err := peer.Connect(ctx); err != nil { + return err } - - s.mux.Reset() - - log.Println("Server multiplexer reset complete") - }) + log.Printf("Peer %d connected", i) - log.Println("Connecting to Telemost...") - if err := peer.Connect(ctx); err != nil { - return err + go peer.WatchConnection(ctx) } - log.Println("Connected to Telemost") - - go peer.WatchConnection(ctx) return s.run(ctx) } @@ -166,7 +179,9 @@ func (s *Server) run(ctx context.Context) error { } } s.connMu.Unlock() - s.peer.Close() + for _, peer := range s.peers { + peer.Close() + } return nil default: }