mirror of
https://github.com/openlibrecommunity/olcrtc.git
synced 2026-05-31 17:39:44 +00:00
feat(client,server): Add dual channel mode for 2x throughput
This commit is contained in:
@@ -24,6 +24,7 @@ func main() {
|
||||
keyHex string
|
||||
debug bool
|
||||
dataDir string
|
||||
duo bool
|
||||
)
|
||||
|
||||
flag.StringVar(&mode, "mode", "", "Mode: srv or cnc")
|
||||
@@ -33,6 +34,7 @@ func main() {
|
||||
flag.StringVar(&keyHex, "key", "", "Shared encryption key (hex)")
|
||||
flag.BoolVar(&debug, "debug", false, "Enable verbose logging")
|
||||
flag.StringVar(&dataDir, "data", "data", "Path to data directory")
|
||||
flag.BoolVar(&duo, "duo", false, "Use dual channels for 2x throughput")
|
||||
flag.Parse()
|
||||
|
||||
if debug {
|
||||
@@ -74,9 +76,9 @@ func main() {
|
||||
go func() {
|
||||
switch mode {
|
||||
case "srv":
|
||||
errCh <- server.Run(ctx, roomURL, keyHex)
|
||||
errCh <- server.Run(ctx, roomURL, keyHex, duo)
|
||||
case "cnc":
|
||||
errCh <- client.Run(ctx, roomURL, keyHex, socksPort)
|
||||
errCh <- client.Run(ctx, roomURL, keyHex, socksPort, duo)
|
||||
}
|
||||
}()
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"io"
|
||||
"log"
|
||||
"net"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pion/webrtc/v4"
|
||||
@@ -21,13 +22,14 @@ import (
|
||||
)
|
||||
|
||||
type Client struct {
|
||||
peer *telemost.Peer
|
||||
peers []*telemost.Peer
|
||||
cipher *crypto.Cipher
|
||||
mux *mux.Multiplexer
|
||||
clientID uint32
|
||||
peerIdx atomic.Uint32
|
||||
}
|
||||
|
||||
func Run(ctx context.Context, roomURL, keyHex string, socksPort int) error {
|
||||
func Run(ctx context.Context, roomURL, keyHex string, socksPort int, duo bool) error {
|
||||
var key []byte
|
||||
var err error
|
||||
|
||||
@@ -62,6 +64,13 @@ func Run(ctx context.Context, roomURL, keyHex string, socksPort int) error {
|
||||
c := &Client{
|
||||
cipher: cipher,
|
||||
clientID: clientID,
|
||||
peers: make([]*telemost.Peer, 0),
|
||||
}
|
||||
|
||||
peerCount := 1
|
||||
if duo {
|
||||
peerCount = 2
|
||||
log.Println("Duo mode: using 2 parallel channels")
|
||||
}
|
||||
|
||||
c.mux = mux.New(c.clientID, func(frame []byte) error {
|
||||
@@ -69,36 +78,43 @@ func Run(ctx context.Context, roomURL, keyHex string, socksPort int) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.peer.Send(encrypted)
|
||||
|
||||
idx := c.peerIdx.Add(1) % uint32(len(c.peers))
|
||||
return c.peers[idx].Send(encrypted)
|
||||
})
|
||||
|
||||
peer, err := telemost.NewPeer(roomURL, names.Generate(), c.onData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.peer = peer
|
||||
for i := 0; i < peerCount; i++ {
|
||||
peer, err := telemost.NewPeer(roomURL, names.Generate(), c.onData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
c.peers = append(c.peers, peer)
|
||||
|
||||
peer.SetReconnectCallback(func(dc *webrtc.DataChannel) {
|
||||
log.Println("Client reconnected - resetting multiplexer state")
|
||||
|
||||
c.mux.UpdateSendFunc(func(frame []byte) error {
|
||||
encrypted, err := c.cipher.Encrypt(frame)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return c.peer.Send(encrypted)
|
||||
peer.SetReconnectCallback(func(dc *webrtc.DataChannel) {
|
||||
log.Printf("Client peer %d reconnected - resetting multiplexer state", i)
|
||||
|
||||
c.mux.UpdateSendFunc(func(frame []byte) error {
|
||||
encrypted, err := c.cipher.Encrypt(frame)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
idx := c.peerIdx.Add(1) % uint32(len(c.peers))
|
||||
return c.peers[idx].Send(encrypted)
|
||||
})
|
||||
|
||||
c.mux.Reset()
|
||||
|
||||
log.Println("Client multiplexer reset complete")
|
||||
})
|
||||
|
||||
c.mux.Reset()
|
||||
|
||||
log.Println("Client multiplexer reset complete")
|
||||
})
|
||||
|
||||
log.Println("Connecting to Telemost...")
|
||||
if err := peer.Connect(ctx); err != nil {
|
||||
return err
|
||||
log.Printf("Connecting peer %d to Telemost...", i)
|
||||
if err := peer.Connect(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
log.Printf("Peer %d connected", i)
|
||||
|
||||
go peer.WatchConnection(ctx)
|
||||
}
|
||||
log.Println("Connected to Telemost")
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
|
||||
@@ -107,11 +123,12 @@ func Run(ctx context.Context, roomURL, keyHex string, socksPort int) error {
|
||||
binary.BigEndian.PutUint16(resetFrame[4:6], 0xFFFF)
|
||||
binary.BigEndian.PutUint16(resetFrame[6:8], 0xFFFF)
|
||||
encrypted, _ := cipher.Encrypt(resetFrame)
|
||||
peer.Send(encrypted)
|
||||
|
||||
for _, peer := range c.peers {
|
||||
peer.Send(encrypted)
|
||||
}
|
||||
log.Printf("Sent reset signal to server (clientID=%d)", c.clientID)
|
||||
|
||||
go peer.WatchConnection(ctx)
|
||||
|
||||
return c.runSOCKS5(ctx, socksPort)
|
||||
}
|
||||
|
||||
|
||||
@@ -10,6 +10,7 @@ import (
|
||||
"log"
|
||||
"net"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pion/webrtc/v4"
|
||||
@@ -21,11 +22,12 @@ import (
|
||||
)
|
||||
|
||||
type Server struct {
|
||||
peer *telemost.Peer
|
||||
peers []*telemost.Peer
|
||||
cipher *crypto.Cipher
|
||||
mux *mux.Multiplexer
|
||||
connections map[uint16]net.Conn
|
||||
connMu sync.RWMutex
|
||||
peerIdx atomic.Uint32
|
||||
}
|
||||
|
||||
type ConnectRequest struct {
|
||||
@@ -34,7 +36,7 @@ type ConnectRequest struct {
|
||||
Port int `json:"port"`
|
||||
}
|
||||
|
||||
func Run(ctx context.Context, roomURL, keyHex string) error {
|
||||
func Run(ctx context.Context, roomURL, keyHex string, duo bool) error {
|
||||
var key []byte
|
||||
var err error
|
||||
|
||||
@@ -67,6 +69,13 @@ func Run(ctx context.Context, roomURL, keyHex string) error {
|
||||
s := &Server{
|
||||
cipher: cipher,
|
||||
connections: make(map[uint16]net.Conn),
|
||||
peers: make([]*telemost.Peer, 0),
|
||||
}
|
||||
|
||||
peerCount := 1
|
||||
if duo {
|
||||
peerCount = 2
|
||||
log.Println("Duo mode: using 2 parallel channels")
|
||||
}
|
||||
|
||||
s.mux = mux.New(0, func(frame []byte) error {
|
||||
@@ -74,49 +83,53 @@ func Run(ctx context.Context, roomURL, keyHex string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return s.peer.Send(encrypted)
|
||||
idx := s.peerIdx.Add(1) % uint32(len(s.peers))
|
||||
return s.peers[idx].Send(encrypted)
|
||||
})
|
||||
|
||||
peer, err := telemost.NewPeer(roomURL, names.Generate(), s.onData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.peer = peer
|
||||
|
||||
peer.SetReconnectCallback(func(dc *webrtc.DataChannel) {
|
||||
log.Println("Server reconnected - resetting multiplexer state")
|
||||
|
||||
s.connMu.Lock()
|
||||
for sid, conn := range s.connections {
|
||||
if conn != nil {
|
||||
conn.Close()
|
||||
}
|
||||
delete(s.connections, sid)
|
||||
for i := 0; i < peerCount; i++ {
|
||||
peer, err := telemost.NewPeer(roomURL, names.Generate(), s.onData)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
s.connMu.Unlock()
|
||||
|
||||
if dc != nil {
|
||||
s.mux.UpdateSendFunc(func(frame []byte) error {
|
||||
encrypted, err := s.cipher.Encrypt(frame)
|
||||
if err != nil {
|
||||
return err
|
||||
s.peers = append(s.peers, peer)
|
||||
|
||||
peer.SetReconnectCallback(func(dc *webrtc.DataChannel) {
|
||||
log.Printf("Server peer %d reconnected - resetting multiplexer state", i)
|
||||
|
||||
s.connMu.Lock()
|
||||
for sid, conn := range s.connections {
|
||||
if conn != nil {
|
||||
conn.Close()
|
||||
}
|
||||
return s.peer.Send(encrypted)
|
||||
})
|
||||
delete(s.connections, sid)
|
||||
}
|
||||
s.connMu.Unlock()
|
||||
|
||||
if dc != nil {
|
||||
s.mux.UpdateSendFunc(func(frame []byte) error {
|
||||
encrypted, err := s.cipher.Encrypt(frame)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
idx := s.peerIdx.Add(1) % uint32(len(s.peers))
|
||||
return s.peers[idx].Send(encrypted)
|
||||
})
|
||||
}
|
||||
|
||||
s.mux.Reset()
|
||||
|
||||
log.Println("Server multiplexer reset complete")
|
||||
})
|
||||
|
||||
log.Printf("Connecting peer %d to Telemost...", i)
|
||||
if err := peer.Connect(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
s.mux.Reset()
|
||||
|
||||
log.Println("Server multiplexer reset complete")
|
||||
})
|
||||
log.Printf("Peer %d connected", i)
|
||||
|
||||
log.Println("Connecting to Telemost...")
|
||||
if err := peer.Connect(ctx); err != nil {
|
||||
return err
|
||||
go peer.WatchConnection(ctx)
|
||||
}
|
||||
log.Println("Connected to Telemost")
|
||||
|
||||
go peer.WatchConnection(ctx)
|
||||
|
||||
return s.run(ctx)
|
||||
}
|
||||
@@ -166,7 +179,9 @@ func (s *Server) run(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
s.connMu.Unlock()
|
||||
s.peer.Close()
|
||||
for _, peer := range s.peers {
|
||||
peer.Close()
|
||||
}
|
||||
return nil
|
||||
default:
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user