diff --git a/.dockerignore b/.dockerignore new file mode 100644 index 0000000..ce8f9a8 --- /dev/null +++ b/.dockerignore @@ -0,0 +1,18 @@ +.git +.gitignore + +Dockerfile +docker-compose*.yml + +code/ +doc/ +asset/ + +*.log +*.tmp +tmp/ + +olcrtc +build/ +dist/ +coverage.out diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..1da6b1e --- /dev/null +++ b/Dockerfile @@ -0,0 +1,54 @@ +# syntax=docker/dockerfile:1.7 + +ARG GO_VERSION=1.25 +ARG ALPINE_VERSION=3.22 + +FROM golang:${GO_VERSION}-alpine${ALPINE_VERSION} AS build + +WORKDIR /src + +RUN apk add --no-cache ca-certificates git + +COPY go.mod go.sum ./ +RUN --mount=type=cache,target=/go/pkg/mod \ + go mod download + +COPY . . + +ARG TARGETOS=linux +ARG TARGETARCH=amd64 + +RUN --mount=type=cache,target=/go/pkg/mod \ + --mount=type=cache,target=/root/.cache/go-build \ + CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} \ + go build -trimpath -ldflags="-s -w" -o /out/olcrtc ./cmd/olcrtc + +FROM alpine:${ALPINE_VERSION} AS runtime + +RUN apk add --no-cache ca-certificates tzdata && \ + addgroup -S olcrtc && \ + mkdir -p /usr/share/olcrtc /var/lib/olcrtc && \ + adduser -S -D -h /var/lib/olcrtc -s /sbin/nologin -G olcrtc olcrtc && \ + chown -R olcrtc:olcrtc /var/lib/olcrtc + +COPY --from=build /out/olcrtc /usr/local/bin/olcrtc +COPY script/docker/olcrtc-entrypoint.sh /usr/local/bin/olcrtc-entrypoint +COPY script/docker/olcrtc-healthcheck.sh /usr/local/bin/olcrtc-healthcheck + +RUN chmod 0755 /usr/local/bin/olcrtc /usr/local/bin/olcrtc-entrypoint /usr/local/bin/olcrtc-healthcheck + +USER olcrtc:olcrtc +WORKDIR /var/lib/olcrtc + +ENV OLCRTC_MODE=srv \ + OLCRTC_PROVIDER=telemost \ + OLCRTC_DATA_DIR=/usr/share/olcrtc \ + OLCRTC_DNS=1.1.1.1:53 \ + OLCRTC_KEY_FILE=/var/lib/olcrtc/key.hex + +VOLUME ["/var/lib/olcrtc"] + +HEALTHCHECK --interval=30s --timeout=3s --start-period=20s --retries=3 \ + CMD ["/usr/local/bin/olcrtc-healthcheck"] + +ENTRYPOINT ["/usr/local/bin/olcrtc-entrypoint"] diff --git a/cmd/olcrtc/main.go b/cmd/olcrtc/main.go index fd378ae..775d7c5 100644 --- a/cmd/olcrtc/main.go +++ b/cmd/olcrtc/main.go @@ -1,8 +1,11 @@ +// Package main provides the olcrtc CLI entrypoint. package main import ( "context" + "errors" "flag" + "fmt" "log" "os" "os/signal" @@ -16,68 +19,51 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/server" ) +type config struct { + mode string + roomID string + provider string + socksPort int + socksHost string + keyHex string + debug bool + dataDir string + duo bool + dnsServer string + socksProxyAddr string + socksProxyPort int +} + +var ( + errUnsupportedProvider = errors.New("only telemost provider supported") + errRoomIDRequired = errors.New("room ID required") + errModeRequired = errors.New("specify -mode srv or -mode cnc") +) + func main() { - var ( - mode string - roomID string - provider string - socksPort int - keyHex string - debug bool - dataDir string - duo bool - dnsServer string - socksProxyAddr string - socksProxyPort int - ) + if err := run(); err != nil { + log.Print(err) + os.Exit(1) + } +} - flag.StringVar(&mode, "mode", "", "Mode: srv or cnc") - flag.StringVar(&roomID, "id", "", "Telemost room ID") - flag.StringVar(&provider, "provider", "telemost", "Provider (telemost only)") - flag.IntVar(&socksPort, "socks-port", 1080, "SOCKS5 port (client only)") - flag.StringVar(&keyHex, "key", "", "Shared encryption key (hex)") - flag.BoolVar(&debug, "debug", false, "Enable verbose logging") - flag.StringVar(&dataDir, "data", "data", "Path to data directory") - flag.BoolVar(&duo, "duo", false, "Use dual channels for 2x throughput") - flag.StringVar(&dnsServer, "dns", "1.1.1.1:53", "DNS server (default: Cloudflare 1.1.1.1)") - flag.StringVar(&socksProxyAddr, "socks-proxy", "", "SOCKS5 proxy address (server only)") - flag.IntVar(&socksProxyPort, "socks-proxy-port", 1080, "SOCKS5 proxy port (server only)") - flag.Parse() +func run() error { + cfg := parseFlags() + configureLogging(cfg.debug) - if debug { - log.SetFlags(log.Ltime | log.Lshortfile) - logger.SetVerbose(true) - } else { - log.SetFlags(log.Ltime) + if err := validateConfig(cfg); err != nil { + return err } - if provider != "telemost" { - log.Fatal("Only telemost provider supported") + dataDir, err := resolveDataDir(cfg.dataDir) + if err != nil { + return err } - if roomID == "" { - log.Fatal("Room ID required") + if err := loadNames(dataDir); err != nil { + return err } - if mode != "srv" && mode != "cnc" { - log.Fatal("Specify -mode srv or -mode cnc") - } - - if !filepath.IsAbs(dataDir) { - exePath, err := os.Executable() - if err == nil { - exeDir := filepath.Dir(exePath) - dataDir = filepath.Join(exeDir, dataDir) - } - } - - namesPath := filepath.Join(dataDir, "names") - surnamesPath := filepath.Join(dataDir, "surnames") - - names.LoadNameFiles(namesPath, surnamesPath) - - roomURL := "https://telemost.yandex.ru/j/" + roomID - ctx, cancel := context.WithCancel(context.Background()) defer cancel() @@ -85,36 +71,126 @@ func main() { signal.Notify(sigCh, os.Interrupt, syscall.SIGTERM) errCh := make(chan error, 1) - - go func() { - switch mode { - case "srv": - errCh <- server.Run(ctx, roomURL, keyHex, duo, dnsServer, socksProxyAddr, socksProxyPort) - case "cnc": - errCh <- client.Run(ctx, roomURL, keyHex, socksPort, duo) - } - }() + go runMode(ctx, cfg, errCh) select { case <-sigCh: log.Println("Shutting down gracefully...") cancel() - - done := make(chan struct{}) - go func() { - <-errCh - close(done) - }() - - select { - case <-done: - log.Println("Shutdown complete") - case <-time.After(5 * time.Second): - log.Println("Shutdown timeout, forcing exit") - } + return waitForShutdown(errCh) case err := <-errCh: - if err != nil { - log.Fatal(err) - } + return err + } +} + +func parseFlags() config { + cfg := config{} + + flag.StringVar(&cfg.mode, "mode", "", "Mode: srv or cnc") + flag.StringVar(&cfg.roomID, "id", "", "Telemost room ID") + flag.StringVar(&cfg.provider, "provider", "telemost", "Provider (telemost only)") + flag.IntVar(&cfg.socksPort, "socks-port", 1080, "SOCKS5 port (client only)") + flag.StringVar(&cfg.socksHost, "socks-host", "127.0.0.1", "SOCKS5 listen host (client only)") + flag.StringVar(&cfg.keyHex, "key", "", "Shared encryption key (hex)") + flag.BoolVar(&cfg.debug, "debug", false, "Enable verbose logging") + flag.StringVar(&cfg.dataDir, "data", "data", "Path to data directory") + flag.BoolVar(&cfg.duo, "duo", false, "Use dual channels for 2x throughput") + flag.StringVar(&cfg.dnsServer, "dns", "1.1.1.1:53", "DNS server (default: Cloudflare 1.1.1.1)") + flag.StringVar(&cfg.socksProxyAddr, "socks-proxy", "", "SOCKS5 proxy address (server only)") + flag.IntVar(&cfg.socksProxyPort, "socks-proxy-port", 1080, "SOCKS5 proxy port (server only)") + flag.Parse() + + return cfg +} + +func configureLogging(debug bool) { + if debug { + log.SetFlags(log.Ltime | log.Lshortfile) + logger.SetVerbose(true) + return + } + + log.SetFlags(log.Ltime) +} + +func validateConfig(cfg config) error { + switch { + case cfg.provider != "telemost": + return errUnsupportedProvider + case cfg.roomID == "": + return errRoomIDRequired + case cfg.mode != "srv" && cfg.mode != "cnc": + return errModeRequired + default: + return nil + } +} + +func resolveDataDir(dataDir string) (string, error) { + if filepath.IsAbs(dataDir) { + return dataDir, nil + } + + exePath, err := os.Executable() + if err != nil { + return "", fmt.Errorf("resolve executable path: %w", err) + } + + return filepath.Join(filepath.Dir(exePath), dataDir), nil +} + +func loadNames(dataDir string) error { + namesPath := filepath.Join(dataDir, "names") + surnamesPath := filepath.Join(dataDir, "surnames") + if err := names.LoadNameFiles(namesPath, surnamesPath); err != nil { + return fmt.Errorf("load embedded names override: %w", err) + } + + return nil +} + +func runMode(ctx context.Context, cfg config, errCh chan<- error) { + roomURL := "https://telemost.yandex.ru/j/" + cfg.roomID + + switch cfg.mode { + case "srv": + errCh <- server.Run( + ctx, + roomURL, + cfg.keyHex, + cfg.duo, + cfg.dnsServer, + cfg.socksProxyAddr, + cfg.socksProxyPort, + ) + case "cnc": + errCh <- client.Run( + ctx, + roomURL, + cfg.keyHex, + cfg.socksPort, + cfg.duo, + cfg.socksHost, + "", + "", + ) + } +} + +func waitForShutdown(errCh <-chan error) error { + done := make(chan error, 1) + go func() { + done <- <-errCh + }() + + select { + case err := <-done: + if err == nil { + log.Println("Shutdown complete") + } + return err + case <-time.After(5 * time.Second): + log.Println("Shutdown timeout, forcing exit") + return nil } } diff --git a/docker-compose.server.yml b/docker-compose.server.yml new file mode 100644 index 0000000..522de34 --- /dev/null +++ b/docker-compose.server.yml @@ -0,0 +1,19 @@ +services: + olcrtc-server: + build: + context: . + image: olcrtc/server:local + container_name: olcrtc-server + restart: unless-stopped + environment: + OLCRTC_ROOM_ID: "${OLCRTC_ROOM_ID:?set OLCRTC_ROOM_ID}" + OLCRTC_KEY: "${OLCRTC_KEY:-}" + OLCRTC_DNS: "${OLCRTC_DNS:-1.1.1.1:53}" + OLCRTC_DUO: "${OLCRTC_DUO:-false}" + OLCRTC_DEBUG: "${OLCRTC_DEBUG:-false}" + volumes: + - olcrtc-state:/var/lib/olcrtc + init: true + +volumes: + olcrtc-state: diff --git a/internal/client/client.go b/internal/client/client.go index 8185656..f89ccc1 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -1,3 +1,4 @@ +// Package client implements the local SOCKS5 client side of the olcrtc tunnel. package client import ( @@ -6,22 +7,31 @@ import ( "encoding/binary" "encoding/hex" "encoding/json" + "errors" "fmt" "io" "log" "net" + "strconv" "sync" "sync/atomic" "time" - "github.com/pion/webrtc/v4" "github.com/openlibrecommunity/olcrtc/internal/crypto" "github.com/openlibrecommunity/olcrtc/internal/logger" "github.com/openlibrecommunity/olcrtc/internal/mux" "github.com/openlibrecommunity/olcrtc/internal/names" "github.com/openlibrecommunity/olcrtc/internal/telemost" + "github.com/pion/webrtc/v4" ) +var ( + errInvalidKeyLength = errors.New("key must be 32 bytes") + errInvalidKeyStringLength = errors.New("key string length must be 32") + errNoConnectedPeers = errors.New("no connected peers available") +) + +// Client manages the client-side mux and SOCKS5 listener. type Client struct { peers []*telemost.Peer cipher *crypto.Cipher @@ -31,134 +41,226 @@ type Client struct { wg sync.WaitGroup } -func Run(ctx context.Context, roomURL, keyHex string, socksPort int, duo bool) error { - var key []byte - var err error +const defaultSOCKSListenHost = "127.0.0.1" - if keyHex == "" { - key = make([]byte, 32) - if _, err := rand.Read(key); err != nil { - return err - } - log.Printf("Generated key: %x", key) - } else { - key, err = hex.DecodeString(keyHex) - if err != nil { - return err - } - if len(key) != 32 { - return fmt.Errorf("key must be 32 bytes, got %d", len(key)) - } - } +// Run starts the client and listens for SOCKS5 traffic. +func Run( + ctx context.Context, + roomURL, + keyHex string, + socksPort int, + duo bool, + socksHost, + socksUser, + socksPass string, +) error { + return RunWithReady(ctx, roomURL, keyHex, socksPort, duo, socksHost, socksUser, socksPass, nil) +} - keyStr := string(key) - if len(keyStr) != 32 { - return fmt.Errorf("key string length must be 32, got %d", len(keyStr)) - } +// RunWithReady starts the client and invokes onReady once the local SOCKS5 listener is accepting connections. +func RunWithReady( + ctx context.Context, + roomURL, + keyHex string, + socksPort int, + duo bool, + socksHost, + socksUser, + socksPass string, + onReady func(), +) error { + runCtx, cancel := context.WithCancel(ctx) + defer cancel() - cipher, err := crypto.NewCipher(keyStr) + key, err := decodeKey(keyHex) if err != nil { return err } - clientID := uint32(time.Now().UnixNano() & 0xFFFFFFFF) + keyStr := string(key) + if len(keyStr) != 32 { + return fmt.Errorf("%w: got %d", errInvalidKeyStringLength, len(keyStr)) + } + + cipher, err := crypto.NewCipher(keyStr) + if err != nil { + return fmt.Errorf("create cipher: %w", err) + } c := &Client{ cipher: cipher, - clientID: clientID, - peers: make([]*telemost.Peer, 0), + clientID: uint32(time.Now().UnixNano() & 0xFFFFFFFF), + peers: make([]*telemost.Peer, 0, peerCount(duo)), } - peerCount := 1 - if duo { - peerCount = 2 - log.Println("Duo mode: using 2 parallel channels") - } + c.mux = mux.New(c.clientID, c.sendFrame) - c.mux = mux.New(c.clientID, func(frame []byte) error { - for { - canSend := true - for _, peer := range c.peers { - if !peer.CanSend() { - canSend = false - break - } - } - if canSend { - break - } - time.Sleep(10 * time.Millisecond) - } - - encrypted, err := c.cipher.Encrypt(frame) - if err != nil { + for peerID := range peerCount(duo) { + if err := c.addPeer(runCtx, roomURL, peerID, cancel); err != nil { return err } - - idx := c.peerIdx.Add(1) % uint32(len(c.peers)) - return c.peers[idx].Send(encrypted) - }) - - for i := 0; i < peerCount; i++ { - peer, err := telemost.NewPeer(roomURL, names.Generate(), c.onData) - if err != nil { - return err - } - c.peers = append(c.peers, peer) - - peer.SetReconnectCallback(func(dc *webrtc.DataChannel) { - log.Printf("Client peer %d reconnected - resetting multiplexer state", i) - - c.mux.UpdateSendFunc(func(frame []byte) error { - encrypted, err := c.cipher.Encrypt(frame) - if err != nil { - return err - } - idx := c.peerIdx.Add(1) % uint32(len(c.peers)) - return c.peers[idx].Send(encrypted) - }) - - c.mux.Reset() - - log.Println("Client multiplexer reset complete") - }) - - log.Printf("Connecting peer %d to Telemost...", i) - if err := peer.Connect(ctx); err != nil { - return err - } - log.Printf("Peer %d connected", i) - - c.wg.Add(1) - go func() { - defer c.wg.Done() - peer.WatchConnection(ctx) - }() } time.Sleep(100 * time.Millisecond) - - resetFrame := make([]byte, 12) - binary.BigEndian.PutUint32(resetFrame[0:4], c.clientID) - binary.BigEndian.PutUint16(resetFrame[4:6], 0xFFFF) - binary.BigEndian.PutUint16(resetFrame[6:8], 0xFFFF) - binary.BigEndian.PutUint32(resetFrame[8:12], 0) - encrypted, _ := cipher.Encrypt(resetFrame) - - for _, peer := range c.peers { - peer.Send(encrypted) - } - log.Printf("Sent reset signal to server (clientID=%d)", c.clientID) + c.sendResetSignal() + + err = c.runSOCKS5(runCtx, socksHost, socksPort, socksUser, socksPass, onReady) - err = c.runSOCKS5(ctx, socksPort) - log.Println("Waiting for client goroutines...") c.wg.Wait() log.Println("Client goroutines finished") - + return err } +func peerCount(duo bool) int { + if duo { + log.Println("Duo mode: using 2 parallel channels") + return 2 + } + + return 1 +} + +func decodeKey(keyHex string) ([]byte, error) { + if keyHex == "" { + key := make([]byte, 32) + if _, err := rand.Read(key); err != nil { + return nil, fmt.Errorf("generate random key: %w", err) + } + + log.Printf("Generated key: %x", key) + return key, nil + } + + key, err := hex.DecodeString(keyHex) + if err != nil { + return nil, fmt.Errorf("decode hex key: %w", err) + } + + if len(key) != 32 { + return nil, fmt.Errorf("%w: got %d", errInvalidKeyLength, len(key)) + } + + return key, nil +} + +func (c *Client) sendFrame(frame []byte) error { + waitUntilPeersCanSend(c.peers) + + encrypted, err := c.cipher.Encrypt(frame) + if err != nil { + return fmt.Errorf("encrypt outgoing frame: %w", err) + } + + peer, err := c.nextPeer() + if err != nil { + return err + } + + if err := peer.Send(encrypted); err != nil { + return fmt.Errorf("send frame via peer: %w", err) + } + + return nil +} + +func waitUntilPeersCanSend(peers []*telemost.Peer) { + for { + canSend := true + for _, peer := range peers { + if !peer.CanSend() { + canSend = false + break + } + } + + if canSend { + return + } + + time.Sleep(10 * time.Millisecond) + } +} + +func (c *Client) nextPeer() (*telemost.Peer, error) { + switch len(c.peers) { + case 0: + return nil, errNoConnectedPeers + case 1: + return c.peers[0], nil + default: + return c.peers[int(c.peerIdx.Add(1)%2)], nil + } +} + +func (c *Client) addPeer( + runCtx context.Context, + roomURL string, + peerID int, + cancel context.CancelFunc, +) error { + peer, err := telemost.NewPeer(roomURL, names.Generate(), c.onData) + if err != nil { + return fmt.Errorf("create peer %d: %w", peerID, err) + } + + peer.SetEndedCallback(func(reason string) { + log.Printf("Client peer %d reported conference end: %s", peerID, reason) + cancel() + }) + + peer.SetReconnectCallback(func(dc *webrtc.DataChannel) { + c.onReconnect(peerID, dc) + }) + + c.peers = append(c.peers, peer) + + log.Printf("Connecting peer %d to Telemost...", peerID) + if err := peer.Connect(runCtx); err != nil { + return fmt.Errorf("connect peer %d: %w", peerID, err) + } + log.Printf("Peer %d connected", peerID) + + c.wg.Add(1) + go func() { + defer c.wg.Done() + peer.WatchConnection(runCtx) + }() + + return nil +} + +func (c *Client) onReconnect(peerID int, dc *webrtc.DataChannel) { + if dc == nil { + log.Printf("Client peer %d channel closed - resetting multiplexer state", peerID) + } else { + log.Printf("Client peer %d reconnected - resetting multiplexer state", peerID) + } + + c.mux.UpdateSendFunc(c.sendFrame) + c.mux.Reset() + + log.Println("Client multiplexer reset complete") +} + +func (c *Client) sendResetSignal() { + resetFrame := mux.BuildControlFrame(c.clientID, mux.ControlResetClient) + encrypted, err := c.cipher.Encrypt(resetFrame) + if err != nil { + log.Printf("Failed to encrypt reset signal: %v", err) + return + } + + for _, peer := range c.peers { + if err := peer.Send(encrypted); err != nil { + log.Printf("Failed to send reset signal to server: %v", err) + } + } + + log.Printf("Sent reset signal to server (clientID=%d)", c.clientID) +} + func (c *Client) onData(data []byte) { plaintext, err := c.cipher.Decrypt(data) if err != nil { @@ -169,18 +271,36 @@ func (c *Client) onData(data []byte) { c.mux.HandleFrame(plaintext) } -func (c *Client) runSOCKS5(ctx context.Context, port int) error { - listener, err := net.Listen("tcp", fmt.Sprintf("0.0.0.0:%d", port)) - if err != nil { - return err +func (c *Client) runSOCKS5( + ctx context.Context, + host string, + port int, + username, + password string, + onReady func(), +) error { + if host == "" { + host = defaultSOCKSListenHost } - log.Printf("SOCKS5 proxy listening on 0.0.0.0:%d", port) + listenAddr := net.JoinHostPort(host, strconv.Itoa(port)) + var lc net.ListenConfig + listener, err := lc.Listen(ctx, "tcp", listenAddr) + if err != nil { + return fmt.Errorf("listen on %s: %w", listenAddr, err) + } + + log.Printf("SOCKS5 proxy listening on %s (auth=%v)", listenAddr, username != "") + if onReady != nil { + onReady() + } go func() { <-ctx.Done() log.Println("Closing SOCKS5 listener...") - listener.Close() + if err := listener.Close(); err != nil { + logger.Debug("SOCKS5 listener close error: %v", err) + } }() for { @@ -189,11 +309,7 @@ func (c *Client) runSOCKS5(ctx context.Context, port int) error { select { case <-ctx.Done(): log.Println("SOCKS5 listener closed") - - for _, peer := range c.peers { - peer.Close() - } - + c.closePeers() return nil default: log.Printf("Accept error: %v", err) @@ -201,20 +317,28 @@ func (c *Client) runSOCKS5(ctx context.Context, port int) error { } } - go c.handleSOCKS5(conn) + go c.handleSOCKS5(conn, username, password) } } -func (c *Client) handleSOCKS5(conn net.Conn) { - defer conn.Close() - - buf := make([]byte, 256) - - if _, err := io.ReadFull(conn, buf[:2]); err != nil { - return +func (c *Client) closePeers() { + for _, peer := range c.peers { + if err := peer.Close(); err != nil { + logger.Debug("Peer close error: %v", err) + } } +} - if buf[0] != 5 { +//nolint:cyclop // SOCKS5 parsing is inherently stateful and mirrors the protocol handshake. +func (c *Client) handleSOCKS5(conn net.Conn, username, password string) { + defer func() { + if err := conn.Close(); err != nil { + logger.Debug("SOCKS5 connection close error: %v", err) + } + }() + + buf := make([]byte, 513) + if !readSOCKSVersionAndMethods(conn, buf) { return } @@ -223,58 +347,160 @@ func (c *Client) handleSOCKS5(conn net.Conn) { return } - conn.Write([]byte{5, 0}) + requireAuth := username != "" + wantMethod := byte(0x00) + if requireAuth { + wantMethod = 0x02 + } - if _, err := io.ReadFull(conn, buf[:4]); err != nil { + if !supportsMethod(buf[:nmethods], wantMethod) { + writeResponse(conn, replyUnsupportedSOCKSMethod()) + return + } + writeResponse(conn, []byte{5, wantMethod}) + + if requireAuth && !authenticateSOCKSUser(conn, buf, username, password) { return } - if buf[1] != 1 { - conn.Write([]byte{5, 7, 0, 1, 0, 0, 0, 0, 0, 0}) + addr, port, ok := readConnectTarget(conn, buf) + if !ok { return } - var addr string - atyp := buf[3] - - switch atyp { - case 1: - if _, err := io.ReadFull(conn, buf[:4]); err != nil { - return - } - addr = fmt.Sprintf("%d.%d.%d.%d", buf[0], buf[1], buf[2], buf[3]) - case 3: - if _, err := io.ReadFull(conn, buf[:1]); err != nil { - return - } - length := buf[0] - if _, err := io.ReadFull(conn, buf[:length]); err != nil { - return - } - addr = string(buf[:length]) - default: - conn.Write([]byte{5, 8, 0, 1, 0, 0, 0, 0, 0, 0}) - return - } - - if _, err := io.ReadFull(conn, buf[:2]); err != nil { - return - } - port := binary.BigEndian.Uint16(buf[:2]) - sid := c.mux.OpenStream() logger.Verbose("SOCKS5 opened stream sid=%d for %s:%d", sid, addr, port) log.Printf("[CLIENT] sid=%d SOCKS5_START %s:%d", sid, addr, port) - req := map[string]interface{}{ - "cmd": "connect", - "addr": addr, - "port": port, + if !c.sendConnectRequest(sid, addr, port) { + return } - reqData, _ := json.Marshal(req) - c.mux.SendData(sid, reqData) + if !c.waitConnectResponse(conn, sid) { + return + } + c.mux.ReadStream(sid) + writeResponse(conn, replySuccess()) + c.proxyStream(conn, sid) +} + +func readSOCKSVersionAndMethods(conn net.Conn, buf []byte) bool { + if _, err := io.ReadFull(conn, buf[:2]); err != nil { + return false + } + + return buf[0] == 5 +} + +func supportsMethod(methods []byte, wantMethod byte) bool { + for _, method := range methods { + if method == wantMethod { + return true + } + } + + return false +} + +func authenticateSOCKSUser(conn net.Conn, buf []byte, username, password string) bool { + if _, err := io.ReadFull(conn, buf[:2]); err != nil { + return false + } + if buf[0] != 0x01 { + return false + } + + ulen := int(buf[1]) + if _, err := io.ReadFull(conn, buf[:ulen+1]); err != nil { + return false + } + + gotUser := string(buf[:ulen]) + plen := int(buf[ulen]) + if _, err := io.ReadFull(conn, buf[:plen]); err != nil { + return false + } + + gotPass := string(buf[:plen]) + if gotUser != username || gotPass != password { + writeResponse(conn, replyAuthFailed()) + return false + } + + writeResponse(conn, replyAuthOK()) + return true +} + +func readConnectTarget(conn net.Conn, buf []byte) (string, uint16, bool) { + if _, err := io.ReadFull(conn, buf[:4]); err != nil { + return "", 0, false + } + + if buf[1] != 1 { + writeResponse(conn, replyCommandNotSupported()) + return "", 0, false + } + + addr, ok := readTargetAddress(conn, buf, buf[3]) + if !ok { + return "", 0, false + } + + if _, err := io.ReadFull(conn, buf[:2]); err != nil { + return "", 0, false + } + + return addr, binary.BigEndian.Uint16(buf[:2]), true +} + +func readTargetAddress(conn net.Conn, buf []byte, atyp byte) (string, bool) { + switch atyp { + case 1: + if _, err := io.ReadFull(conn, buf[:4]); err != nil { + return "", false + } + return fmt.Sprintf("%d.%d.%d.%d", buf[0], buf[1], buf[2], buf[3]), true + case 3: + if _, err := io.ReadFull(conn, buf[:1]); err != nil { + return "", false + } + + length := buf[0] + if _, err := io.ReadFull(conn, buf[:length]); err != nil { + return "", false + } + return string(buf[:length]), true + default: + writeResponse(conn, replyAddressNotSupported()) + return "", false + } +} + +func (c *Client) sendConnectRequest(sid uint16, addr string, port uint16) bool { + reqData, err := json.Marshal(struct { + Cmd string `json:"cmd"` + Addr string `json:"addr"` + Port uint16 `json:"port"` + }{ + Cmd: "connect", + Addr: addr, + Port: port, + }) + if err != nil { + logger.Debug("Connect request marshal error: %v", err) + return false + } + + if err := c.mux.SendData(sid, reqData); err != nil { + logger.Debug("Connect request send error: %v", err) + return false + } + + return true +} + +func (c *Client) waitConnectResponse(conn net.Conn, sid uint16) bool { dataReady := c.mux.WaitForData(sid) timeout := time.NewTimer(10 * time.Second) defer timeout.Stop() @@ -283,18 +509,19 @@ func (c *Client) handleSOCKS5(conn net.Conn) { case <-dataReady: stream := c.mux.GetStream(sid) if stream == nil || len(stream.RecvBuf()) == 0 { - conn.Write([]byte{5, 4, 0, 1, 0, 0, 0, 0, 0, 0}) - return + writeResponse(conn, replyHostUnreachable()) + return false } case <-timeout.C: - conn.Write([]byte{5, 4, 0, 1, 0, 0, 0, 0, 0, 0}) - return + writeResponse(conn, replyHostUnreachable()) + return false } - c.mux.ReadStream(sid) - - conn.Write([]byte{5, 0, 0, 1, 0, 0, 0, 0, 0, 0}) + return true +} +//nolint:cyclop // The stream pump handles two coordinated goroutines and shutdown races in one place. +func (c *Client) proxyStream(conn net.Conn, sid uint16) { done := make(chan struct{}) streamClosed := make(chan struct{}) @@ -304,7 +531,9 @@ func (c *Client) handleSOCKS5(conn net.Conn) { for { n, err := conn.Read(buf) if err != nil { - c.mux.CloseStream(sid) + if err := c.mux.CloseStream(sid); err != nil { + logger.Debug("Close stream error: %v", err) + } return } if err := c.mux.SendData(sid, buf[:n]); err != nil { @@ -326,14 +555,8 @@ func (c *Client) handleSOCKS5(conn net.Conn) { return case <-ticker.C: data := c.mux.ReadStream(sid) - if len(data) > 0 { - for len(data) > 0 { - n, err := conn.Write(data) - if err != nil { - return - } - data = data[n:] - } + if len(data) > 0 && !writeStreamData(conn, data) { + return } if c.mux.StreamClosed(sid) { @@ -348,3 +571,49 @@ func (c *Client) handleSOCKS5(conn net.Conn) { case <-streamClosed: } } + +func writeStreamData(conn net.Conn, data []byte) bool { + for len(data) > 0 { + n, err := conn.Write(data) + if err != nil { + return false + } + data = data[n:] + } + + return true +} + +func writeResponse(conn net.Conn, response []byte) { + if _, err := conn.Write(response); err != nil { + logger.Debug("SOCKS5 response write error: %v", err) + } +} + +func replyUnsupportedSOCKSMethod() []byte { + return []byte{5, 0xFF} +} + +func replyAuthFailed() []byte { + return []byte{0x01, 0x01} +} + +func replyAuthOK() []byte { + return []byte{0x01, 0x00} +} + +func replyCommandNotSupported() []byte { + return []byte{5, 7, 0, 1, 0, 0, 0, 0, 0, 0} +} + +func replyAddressNotSupported() []byte { + return []byte{5, 8, 0, 1, 0, 0, 0, 0, 0, 0} +} + +func replyHostUnreachable() []byte { + return []byte{5, 4, 0, 1, 0, 0, 0, 0, 0, 0} +} + +func replySuccess() []byte { + return []byte{5, 0, 0, 1, 0, 0, 0, 0, 0, 0} +} diff --git a/internal/mux/mux.go b/internal/mux/mux.go index 30a7934..83bee25 100644 --- a/internal/mux/mux.go +++ b/internal/mux/mux.go @@ -6,19 +6,33 @@ package mux import ( "encoding/binary" + "errors" "sync" + "time" "github.com/openlibrecommunity/olcrtc/internal/logger" ) +const ( + ControlStreamID uint16 = 0xFFFF + ControlLength uint16 = 0xFFFF + + ControlResetClient uint32 = 1 +) + +type ControlFrame struct { + ClientID uint32 + Type uint32 +} + type Stream struct { - ID uint16 - ClientID uint32 - recvBuf []byte - closed bool - mu sync.Mutex - nextSeq uint32 - outOfOrder map[uint32][]byte + ID uint16 + ClientID uint32 + recvBuf []byte + closed bool + mu sync.Mutex + nextSeq uint32 + outOfOrder map[uint32][]byte } func (s *Stream) RecvBuf() []byte { @@ -48,7 +62,7 @@ func New(clientID uint32, onSend func([]byte) error) *Multiplexer { clientID: clientID, onSend: onSend, maxStreams: 10000, - maxBufferSize: 16 * 1024 * 1024, + maxBufferSize: 32 * 1024 * 1024, dataReady: make(map[uint16]chan struct{}), sendSeq: make(map[uint16]uint32), } @@ -64,7 +78,7 @@ func (m *Multiplexer) OpenStream() uint16 { if m.nextID == 0 { m.nextID = 1 } - + if _, exists := m.streams[sid]; !exists { m.streams[sid] = &Stream{ ID: sid, @@ -86,9 +100,10 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error { return nil } - const chunkSize = 7168 + // Keep encrypted DataChannel messages below Telemost's observed 8 KiB cap. + const chunkSize = 7000 totalChunks := (len(data) + chunkSize - 1) / chunkSize - + if totalChunks > 10 { logger.Debug("SendData: sid=%d, size=%d bytes, chunks=%d", sid, len(data), totalChunks) } @@ -100,12 +115,12 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error { } chunk := data[i:end] - + m.sendSeqMu.Lock() seq := m.sendSeq[sid] m.sendSeq[sid]++ m.sendSeqMu.Unlock() - + frame := make([]byte, 12+len(chunk)) binary.BigEndian.PutUint32(frame[0:4], m.clientID) binary.BigEndian.PutUint16(frame[4:6], sid) @@ -128,7 +143,7 @@ func (m *Multiplexer) CloseStream(sid uint16) error { if stream, exists := m.streams[sid]; exists { stream.closed = true } - + m.sendSeqMu.Lock() delete(m.sendSeq, sid) m.sendSeqMu.Unlock() @@ -142,24 +157,47 @@ func (m *Multiplexer) CloseStream(sid uint16) error { return m.onSend(frame) } -func (m *Multiplexer) HandleFrame(frame []byte) { +func (m *Multiplexer) SendClientReset() error { + if m.clientID == 0 { + return errors.New("client reset requires a non-zero client id") + } + return m.onSend(BuildControlFrame(m.clientID, ControlResetClient)) +} + +func BuildControlFrame(clientID uint32, controlType uint32) []byte { + frame := make([]byte, 12) + binary.BigEndian.PutUint32(frame[0:4], clientID) + binary.BigEndian.PutUint16(frame[4:6], ControlStreamID) + binary.BigEndian.PutUint16(frame[6:8], ControlLength) + binary.BigEndian.PutUint32(frame[8:12], controlType) + return frame +} + +func ParseControlFrame(frame []byte) (ControlFrame, bool) { + if len(frame) < 12 { + return ControlFrame{}, false + } + + sid := binary.BigEndian.Uint16(frame[4:6]) + length := binary.BigEndian.Uint16(frame[6:8]) + if sid != ControlStreamID || length != ControlLength { + return ControlFrame{}, false + } + + return ControlFrame{ + ClientID: binary.BigEndian.Uint32(frame[0:4]), + Type: binary.BigEndian.Uint32(frame[8:12]), + }, true +} + +func (m *Multiplexer) HandleFrame(frame []byte) { + control, ok := ParseControlFrame(frame) + if ok { + m.handleControlFrame(control) + return + } + if len(frame) < 12 { - if len(frame) >= 8 { - clientID := binary.BigEndian.Uint32(frame[0:4]) - sid := binary.BigEndian.Uint16(frame[4:6]) - length := binary.BigEndian.Uint16(frame[6:8]) - - if sid == 0xFFFF && length == 0xFFFF { - m.mu.Lock() - for streamSid, stream := range m.streams { - if stream.ClientID == clientID { - stream.closed = true - delete(m.streams, streamSid) - } - } - m.mu.Unlock() - } - } return } @@ -168,18 +206,6 @@ func (m *Multiplexer) HandleFrame(frame []byte) { length := binary.BigEndian.Uint16(frame[6:8]) seq := binary.BigEndian.Uint32(frame[8:12]) - if sid == 0xFFFF && length == 0xFFFF { - m.mu.Lock() - for streamSid, stream := range m.streams { - if stream.ClientID == clientID { - stream.closed = true - delete(m.streams, streamSid) - } - } - m.mu.Unlock() - return - } - if length == 0 { m.mu.Lock() if stream, exists := m.streams[sid]; exists && stream.ClientID == clientID { @@ -197,7 +223,7 @@ func (m *Multiplexer) HandleFrame(frame []byte) { m.mu.Lock() defer m.mu.Unlock() - + stream, exists := m.streams[sid] if !exists { if len(m.streams) >= m.maxStreams { @@ -218,29 +244,41 @@ func (m *Multiplexer) HandleFrame(frame []byte) { stream.nextSeq = 0 stream.outOfOrder = make(map[uint32][]byte) } - + if seq == stream.nextSeq { - if len(stream.recvBuf)+len(data) > m.maxBufferSize { - stream.closed = true + // Backpressure: if the stream buffer is full, release the mux lock and + // wait for the reader to drain it. Dropping/closing here would corrupt + // the TCP stream carried over the mux — large HTTP/2 downloads (X, + // Instagram, YouTube) that push data faster than conn.Write can accept + // would lose bytes and hang forever. + if s := m.waitForBufferSpace(sid, clientID, len(data)); s == nil { return + } else { + stream = s } stream.recvBuf = append(stream.recvBuf, data...) stream.nextSeq++ - + for { - if nextData, ok := stream.outOfOrder[stream.nextSeq]; ok { - if len(stream.recvBuf)+len(nextData) > m.maxBufferSize { - stream.closed = true - return - } - stream.recvBuf = append(stream.recvBuf, nextData...) - delete(stream.outOfOrder, stream.nextSeq) - stream.nextSeq++ - } else { + nextData, ok := stream.outOfOrder[stream.nextSeq] + if !ok { break } + if s := m.waitForBufferSpace(sid, clientID, len(nextData)); s == nil { + return + } else { + stream = s + } + nextData, ok = stream.outOfOrder[stream.nextSeq] + if !ok { + break + } + stream.recvBuf = append(stream.recvBuf, nextData...) + delete(stream.outOfOrder, stream.nextSeq) + stream.nextSeq++ + logger.Verbose("Applied out-of-order packet sid=%d seq=%d", sid, stream.nextSeq-1) } - + m.dataReadyMu.Lock() if ch, ok := m.dataReady[sid]; ok { select { @@ -256,6 +294,46 @@ func (m *Multiplexer) HandleFrame(frame []byte) { } } +func (m *Multiplexer) handleControlFrame(control ControlFrame) { + switch control.Type { + case ControlResetClient: + m.ResetClient(control.ClientID) + default: + logger.Debug("Unknown mux control frame type=%d clientID=%d", control.Type, control.ClientID) + } +} + +func (m *Multiplexer) ResetClient(clientID uint32) { + m.mu.Lock() + defer m.mu.Unlock() + + for streamSid, stream := range m.streams { + if stream.ClientID == clientID { + stream.closed = true + delete(m.streams, streamSid) + } + } +} + +// waitForBufferSpace releases m.mu and waits until the stream's recvBuf has +// room for `need` more bytes, then re-acquires the lock. Returns the (possibly +// re-fetched) stream, or nil if the stream disappeared / was reset / closed. +// Caller must hold m.mu (write-locked) on entry and will hold it on return. +func (m *Multiplexer) waitForBufferSpace(sid uint16, clientID uint32, need int) *Stream { + for { + stream, ok := m.streams[sid] + if !ok || stream.ClientID != clientID || stream.closed { + return nil + } + if len(stream.recvBuf)+need <= m.maxBufferSize { + return stream + } + m.mu.Unlock() + time.Sleep(5 * time.Millisecond) + m.mu.Lock() + } +} + func (m *Multiplexer) ReadStream(sid uint16) []byte { m.mu.Lock() defer m.mu.Unlock() @@ -302,10 +380,10 @@ func (m *Multiplexer) Reset() { for _, stream := range m.streams { stream.closed = true } - + m.streams = make(map[uint16]*Stream) m.nextID = 1 - + m.sendSeqMu.Lock() m.sendSeq = make(map[uint16]uint32) m.sendSeqMu.Unlock() @@ -314,14 +392,14 @@ func (m *Multiplexer) Reset() { func (m *Multiplexer) UpdateSendFunc(onSend func([]byte) error) { m.mu.Lock() defer m.mu.Unlock() - + m.onSend = onSend } func (m *Multiplexer) WaitForData(sid uint16) <-chan struct{} { m.dataReadyMu.Lock() defer m.dataReadyMu.Unlock() - + if _, ok := m.dataReady[sid]; !ok { m.dataReady[sid] = make(chan struct{}, 1) } @@ -331,7 +409,7 @@ func (m *Multiplexer) WaitForData(sid uint16) <-chan struct{} { func (m *Multiplexer) CleanupDataChannel(sid uint16) { m.dataReadyMu.Lock() defer m.dataReadyMu.Unlock() - + if ch, ok := m.dataReady[sid]; ok { close(ch) delete(m.dataReady, sid) diff --git a/internal/names/data/names b/internal/names/data/names new file mode 100644 index 0000000..60e45ff --- /dev/null +++ b/internal/names/data/names @@ -0,0 +1,735 @@ +Аарон +Аба +Аббас +Абд аль-Узза +Абдуллах +Абид +Аботур +Аввакум +Август +Авдей +Авель +Аверкий +Авигдор +Авирмэд +Авксентий +Авл +Авнер +Аврелий +Автандил +Автоном +Агапит +Агафангел +Агафодор +Агафон +Аги +Агриппа +Адам +Адар +Адиль +Адольф +Адонирам +Адриан +Азамат +Азарий +Азат +Азиз +Азим +Айварс +Айдар +Айрат +Акакий +Аквилий +Акиф +Акоп +Аксель +Алан +Аланус +Алек +Александр +Алексей +Алемдар +Алик +Алим +Алипий +Алишер +Алмат +Алоиз +Алон +Альберик +Альберт +Альбин +Альваро +Альвиан +Альвизе +Альфонс +Альфред +Амадис +Амвросий +Амедей +Амин +Амир +Амр +Амфилохий +Анания +Анас +Анастасий +Анатолий +Ангеляр +Андокид +Андрей +Андроник +Аннерс +Анри +Ансельм +Антипа +Антон +Антоний +Антонин +Антуан +Арам +Арефа +Арзуман +Аристарх +Аристон +Ариф +Аркадий +Арсений +Артём +Артур +Арфаксад +Асаф +Атанасий +Атом +Аттик +Афанасий +Афинагор +Афиней +Афиф +Африкан +Ахилл +Ахмад +Ахтям +Ашот +Бадар +Барни +Бартоломео +Басир +Бахтияр +Баян +Безсон +Бен +Беньямин +Берт +Бехруз +Билял +Богдан +Болеслав +Бонавентура +Борис +Борислав +Боян +Бронислав +Брячислав +Бурхан +Бутрос +Бямбасурэн +Вадим +Валентин +Валентино +Валерий +Валерьян +Вальдемар +Вангьял +Варлам +Варнава +Варфоломей +Василий +Вахтанг +Велвел +Венансио +Венедикт +Вениамин +Венцеслав +Вигго +Викентий +Виктор +Викторин +Вильгельм +Винцас +Виссарион +Виталий +Витаутас +Вито +Владимир +Владислав +Владлен +Влас +Воислав +Володарь +Вольфганг +Вописк +Всеволод +Всеслав +Вук +Вукол +Вышеслав +Вячеслав +Габриеле +Гавриил +Гай +Галактион +Галымжан +Гамлет +Гаспар +Гафур +Гвидо +Гейдар +Геласий +Гелий +Гельмут +Геннадий +Генри +Генрих +Георге +Георгий +Гераклид +Герасим +Герберт +Герман +Германн +Геронтий +Герхард +Гийом +Гильем +Гинкмар +Глеб +Гней +Гоар +Горацио +Гордей +Градислав +Григорий +Гримоальд +Гуго +Гурий +Густав +Гьялцен +Давид +Дамдинсурэн +Дамир +Даниил +Дарий +Демид +Демьян +Денеш +Денис +Децим +Джаббар +Джамиль +Джан +Джанер +Джанфранко +Джафар +Джейкоб +Джихангир +Джованни +Джон +Джохар +Джулиано +Джулиус +Дино +Диодор +Дитер +Дитмар +Дитрих +Дмитрий +Доминик +Дональд +Донат +Дорофей +Досифей +Евгений +Евграф +Евдоким +Еврит +Евсей +Евстафий +Евтихан +Евтихий +Егор +Елеазар +Елисей +Емельян +Епифаний +Ербол +Ерванд +Еремей +Ермак +Ермолай +Ерофей +Ефим +Ефрем +Жан +Ждан +Жером +Жоан +Захар +Захария +Збигнев +Зденек +Зейналабдин +Зенон +Зеэв +Зигмунд +Зинон +Зия +Золтан +Зосима +Иакинф +Иан +Ибрагим +Ибрахим +Иван +Игнатий +Игорь +Иероним +Иерофей +Израиль +Икрима +Иларий +Илия +Илларион +Илмари +Ильфат +Илья +Имран +Иннокентий +Иоаким +Иоанн +Иоанникий +Иоахим +Иов +Иоганн +Иоганнес +Ионафан +Иосафат +Ираклий +Иржи +Иринарх +Ириней +Иродион +Иса +Исаак +Исаакий +Исаия +Исидор +Ислам +Исмаил +Истислав +Истома +Истукарий +Иштван +Йюрген +Кадваллон +Кадир +Казимир +Каликст +Калин +Каллистрат +Кальман +Канат +Карен +Карлос +Карп +Картерий +Кассиан +Кассий +Касторий +Касьян +Катберт +Квинт +Кехлер +Киллиан +Ким +Кир +Кириак +Кирилл +Клаас +Клавдиан +Клеоник +Климент +Кондрат +Конон +Конрад +Константин +Корнелиус +Корнилий +Коррадо +Косьма +Кратет +Кратипп +Крис +Криспин +Кристиан +Кронид +Кузьма +Куприян +Курбан +Курт +Кутлуг-Буга +Кэлин +Лаврентий +Лавс +Ладислав +Лазарь +Лайл +Лампрехт +Ландульф +Лев +Леви +Ленни +Леонид +Леонтий +Леонхард +Лиам +Линкей +Логгин +Лоренц +Лоренцо +Луи +Луитпольд +Лука +Лукас +Лукий +Лукьян +Луций +Людовик +Люцифер +Макар +Максим +Максимиан +Максимилиан +Малик +Малх +Мамбет +Маний +Мануил +Мануэль +Мариан +Мариус +Марк +Маркел +Мартын +Марчелло +Матвей +Матео +Матиас +Матфей +Матфий +Махмуд +Меир +Мелентий +Мелитон +Менахем-Мендель +Месроп +Мефодий +Мечислав +Мика +Микеланджело +Микулаш +Милорад +Мина +Мирко +Мирон +Мирослав +Митрофан +Михаил +Михей +Младан +Модест +Моисей +Мордехай +Мстислав +Мурад +Мухаммед +Мэдисон +Мэлор +Мэлс +Назар +Наиль +Насиф +Натан +Натаниэль +Наум +Нафанаил +Нацагдорж +Нестор +Никандр +Никанор +Никита +Никифор +Никодим +Николай +Нил +Нильс +Ноа +Ной +Норд +Нуржан +Нурлан +Овадья +Оге +Одинец +Октав +Октавиан +Октавий +Октавио +Олаф +Оле +Олег +Оливер +Ольгерд +Онисим +Орест +Осип +Оскар +Осман +Отто +Оттон +Очирбат +Пабло +Павел +Павлин +Павсикакий +Паисий +Палладий +Панкратий +Пантелеймон +Папа +Паруйр +Парфений +Патрик +Пафнутий +Пахомий +Педро +Пётр +Пимен +Пинхас +Пипин +Питирим +Пол +Полидор +Полиевкт +Поликарп +Поликрат +Порфирий +Потап +Предраг +Премысл +Приск +Прокл +Прокопий +Прокул +Протасий +Прохор +Публий +Рагнар +Рагуил +Радмир +Радослав +Разумник +Раймонд +Рамадан +Рамазан +Рахман +Рашад +Рейнхард +Ренат +Реститут +Ричард +Роберт +Родерик +Родион +Рожер +Розарио +Роман +Ромен +Рон +Ронан +Ростислав +Рудольф +Руслан +Руф +Руфин +Рушан +Сабит +Савва +Савватий +Савелий +Савин +Саддам +Садик +Саид +Салават +Салих +Саллюстий +Салман +Самуил +Сармат +Святослав +Севастьян +Северин +Секст +Секунд +Семён +Септимий +Серапион +Сергей +Серж +Сигеберт +Сильвестр +Симеон +Симон +Созон +Соломон +Сонам +Софрон +Спиридон +Срджан +Станислав +Степан +Стефано +Стивен +Таврион +Тавус +Тадеуш +Тарас +Тарасий +Тейс +Тендзин +Теофил +Терентий +Терри +Тиберий +Тигран +Тимофей +Тимур +Тихомир +Тихон +Томас +Томоми +Торос +Тофик +Трифон +Трофим +Тудхалия +Тутмос +Тьерри +Тьяго +Уве +Уильям +Улдис +Ульрих +Ульф +Умар +Урызмаг +Усама +Усман +Фавст +Фаддей +Файзулла +Фарид +Фахраддин +Федериго +Федосей +Федот +Фейсал +Феликс +Феоктист +Феофан +Феофил +Феофилакт +Фердинанд +Ференц +Фёдор +Фидель +Филарет +Филат +Филип +Филипп +Философ +Филострат +Фирс +Фока +Фома +Фотий +Франц +Франческо +Фредерик +Фридрих +Фродо +Фрол +Фульк +Хайме +Ханс +Харальд +Харитон +Харри +Харрисон +Хасан +Хетаг +Хильдерик +Хирам +Хлодвиг +Хокон +Хорив +Хоселито +Хосрой +Хрисанф +Христофор +Хуан +Цэрэндорж +Чеслав +Шалом +Шамиль +Шамсуддин +Шапур +Шарль +Шейх-Хайдар +Шон +Эберхард +Эдмунд +Эдна +Эдуард +Элбэгдорж +Элджернон +Элиас +Эллиот +Эмиль +Энрик +Энрико +Энтони +Эразм +Эраст +Эрик +Эрнст +Эсекьель +Эстебан +Этьен +Ювеналий +Юлиан +Юлий +Юлиус +Юрий +Юстас +Юстин +Яков +Якуб +Якун +Ян +Яни +Януарий +Яромир +Ярополк +Ярослав diff --git a/internal/names/data/surnames b/internal/names/data/surnames new file mode 100644 index 0000000..60e45ff --- /dev/null +++ b/internal/names/data/surnames @@ -0,0 +1,735 @@ +Аарон +Аба +Аббас +Абд аль-Узза +Абдуллах +Абид +Аботур +Аввакум +Август +Авдей +Авель +Аверкий +Авигдор +Авирмэд +Авксентий +Авл +Авнер +Аврелий +Автандил +Автоном +Агапит +Агафангел +Агафодор +Агафон +Аги +Агриппа +Адам +Адар +Адиль +Адольф +Адонирам +Адриан +Азамат +Азарий +Азат +Азиз +Азим +Айварс +Айдар +Айрат +Акакий +Аквилий +Акиф +Акоп +Аксель +Алан +Аланус +Алек +Александр +Алексей +Алемдар +Алик +Алим +Алипий +Алишер +Алмат +Алоиз +Алон +Альберик +Альберт +Альбин +Альваро +Альвиан +Альвизе +Альфонс +Альфред +Амадис +Амвросий +Амедей +Амин +Амир +Амр +Амфилохий +Анания +Анас +Анастасий +Анатолий +Ангеляр +Андокид +Андрей +Андроник +Аннерс +Анри +Ансельм +Антипа +Антон +Антоний +Антонин +Антуан +Арам +Арефа +Арзуман +Аристарх +Аристон +Ариф +Аркадий +Арсений +Артём +Артур +Арфаксад +Асаф +Атанасий +Атом +Аттик +Афанасий +Афинагор +Афиней +Афиф +Африкан +Ахилл +Ахмад +Ахтям +Ашот +Бадар +Барни +Бартоломео +Басир +Бахтияр +Баян +Безсон +Бен +Беньямин +Берт +Бехруз +Билял +Богдан +Болеслав +Бонавентура +Борис +Борислав +Боян +Бронислав +Брячислав +Бурхан +Бутрос +Бямбасурэн +Вадим +Валентин +Валентино +Валерий +Валерьян +Вальдемар +Вангьял +Варлам +Варнава +Варфоломей +Василий +Вахтанг +Велвел +Венансио +Венедикт +Вениамин +Венцеслав +Вигго +Викентий +Виктор +Викторин +Вильгельм +Винцас +Виссарион +Виталий +Витаутас +Вито +Владимир +Владислав +Владлен +Влас +Воислав +Володарь +Вольфганг +Вописк +Всеволод +Всеслав +Вук +Вукол +Вышеслав +Вячеслав +Габриеле +Гавриил +Гай +Галактион +Галымжан +Гамлет +Гаспар +Гафур +Гвидо +Гейдар +Геласий +Гелий +Гельмут +Геннадий +Генри +Генрих +Георге +Георгий +Гераклид +Герасим +Герберт +Герман +Германн +Геронтий +Герхард +Гийом +Гильем +Гинкмар +Глеб +Гней +Гоар +Горацио +Гордей +Градислав +Григорий +Гримоальд +Гуго +Гурий +Густав +Гьялцен +Давид +Дамдинсурэн +Дамир +Даниил +Дарий +Демид +Демьян +Денеш +Денис +Децим +Джаббар +Джамиль +Джан +Джанер +Джанфранко +Джафар +Джейкоб +Джихангир +Джованни +Джон +Джохар +Джулиано +Джулиус +Дино +Диодор +Дитер +Дитмар +Дитрих +Дмитрий +Доминик +Дональд +Донат +Дорофей +Досифей +Евгений +Евграф +Евдоким +Еврит +Евсей +Евстафий +Евтихан +Евтихий +Егор +Елеазар +Елисей +Емельян +Епифаний +Ербол +Ерванд +Еремей +Ермак +Ермолай +Ерофей +Ефим +Ефрем +Жан +Ждан +Жером +Жоан +Захар +Захария +Збигнев +Зденек +Зейналабдин +Зенон +Зеэв +Зигмунд +Зинон +Зия +Золтан +Зосима +Иакинф +Иан +Ибрагим +Ибрахим +Иван +Игнатий +Игорь +Иероним +Иерофей +Израиль +Икрима +Иларий +Илия +Илларион +Илмари +Ильфат +Илья +Имран +Иннокентий +Иоаким +Иоанн +Иоанникий +Иоахим +Иов +Иоганн +Иоганнес +Ионафан +Иосафат +Ираклий +Иржи +Иринарх +Ириней +Иродион +Иса +Исаак +Исаакий +Исаия +Исидор +Ислам +Исмаил +Истислав +Истома +Истукарий +Иштван +Йюрген +Кадваллон +Кадир +Казимир +Каликст +Калин +Каллистрат +Кальман +Канат +Карен +Карлос +Карп +Картерий +Кассиан +Кассий +Касторий +Касьян +Катберт +Квинт +Кехлер +Киллиан +Ким +Кир +Кириак +Кирилл +Клаас +Клавдиан +Клеоник +Климент +Кондрат +Конон +Конрад +Константин +Корнелиус +Корнилий +Коррадо +Косьма +Кратет +Кратипп +Крис +Криспин +Кристиан +Кронид +Кузьма +Куприян +Курбан +Курт +Кутлуг-Буга +Кэлин +Лаврентий +Лавс +Ладислав +Лазарь +Лайл +Лампрехт +Ландульф +Лев +Леви +Ленни +Леонид +Леонтий +Леонхард +Лиам +Линкей +Логгин +Лоренц +Лоренцо +Луи +Луитпольд +Лука +Лукас +Лукий +Лукьян +Луций +Людовик +Люцифер +Макар +Максим +Максимиан +Максимилиан +Малик +Малх +Мамбет +Маний +Мануил +Мануэль +Мариан +Мариус +Марк +Маркел +Мартын +Марчелло +Матвей +Матео +Матиас +Матфей +Матфий +Махмуд +Меир +Мелентий +Мелитон +Менахем-Мендель +Месроп +Мефодий +Мечислав +Мика +Микеланджело +Микулаш +Милорад +Мина +Мирко +Мирон +Мирослав +Митрофан +Михаил +Михей +Младан +Модест +Моисей +Мордехай +Мстислав +Мурад +Мухаммед +Мэдисон +Мэлор +Мэлс +Назар +Наиль +Насиф +Натан +Натаниэль +Наум +Нафанаил +Нацагдорж +Нестор +Никандр +Никанор +Никита +Никифор +Никодим +Николай +Нил +Нильс +Ноа +Ной +Норд +Нуржан +Нурлан +Овадья +Оге +Одинец +Октав +Октавиан +Октавий +Октавио +Олаф +Оле +Олег +Оливер +Ольгерд +Онисим +Орест +Осип +Оскар +Осман +Отто +Оттон +Очирбат +Пабло +Павел +Павлин +Павсикакий +Паисий +Палладий +Панкратий +Пантелеймон +Папа +Паруйр +Парфений +Патрик +Пафнутий +Пахомий +Педро +Пётр +Пимен +Пинхас +Пипин +Питирим +Пол +Полидор +Полиевкт +Поликарп +Поликрат +Порфирий +Потап +Предраг +Премысл +Приск +Прокл +Прокопий +Прокул +Протасий +Прохор +Публий +Рагнар +Рагуил +Радмир +Радослав +Разумник +Раймонд +Рамадан +Рамазан +Рахман +Рашад +Рейнхард +Ренат +Реститут +Ричард +Роберт +Родерик +Родион +Рожер +Розарио +Роман +Ромен +Рон +Ронан +Ростислав +Рудольф +Руслан +Руф +Руфин +Рушан +Сабит +Савва +Савватий +Савелий +Савин +Саддам +Садик +Саид +Салават +Салих +Саллюстий +Салман +Самуил +Сармат +Святослав +Севастьян +Северин +Секст +Секунд +Семён +Септимий +Серапион +Сергей +Серж +Сигеберт +Сильвестр +Симеон +Симон +Созон +Соломон +Сонам +Софрон +Спиридон +Срджан +Станислав +Степан +Стефано +Стивен +Таврион +Тавус +Тадеуш +Тарас +Тарасий +Тейс +Тендзин +Теофил +Терентий +Терри +Тиберий +Тигран +Тимофей +Тимур +Тихомир +Тихон +Томас +Томоми +Торос +Тофик +Трифон +Трофим +Тудхалия +Тутмос +Тьерри +Тьяго +Уве +Уильям +Улдис +Ульрих +Ульф +Умар +Урызмаг +Усама +Усман +Фавст +Фаддей +Файзулла +Фарид +Фахраддин +Федериго +Федосей +Федот +Фейсал +Феликс +Феоктист +Феофан +Феофил +Феофилакт +Фердинанд +Ференц +Фёдор +Фидель +Филарет +Филат +Филип +Филипп +Философ +Филострат +Фирс +Фока +Фома +Фотий +Франц +Франческо +Фредерик +Фридрих +Фродо +Фрол +Фульк +Хайме +Ханс +Харальд +Харитон +Харри +Харрисон +Хасан +Хетаг +Хильдерик +Хирам +Хлодвиг +Хокон +Хорив +Хоселито +Хосрой +Хрисанф +Христофор +Хуан +Цэрэндорж +Чеслав +Шалом +Шамиль +Шамсуддин +Шапур +Шарль +Шейх-Хайдар +Шон +Эберхард +Эдмунд +Эдна +Эдуард +Элбэгдорж +Элджернон +Элиас +Эллиот +Эмиль +Энрик +Энрико +Энтони +Эразм +Эраст +Эрик +Эрнст +Эсекьель +Эстебан +Этьен +Ювеналий +Юлиан +Юлий +Юлиус +Юрий +Юстас +Юстин +Яков +Якуб +Якун +Ян +Яни +Януарий +Яромир +Ярополк +Ярослав diff --git a/internal/names/names.go b/internal/names/names.go index f443fd5..f859d1b 100644 --- a/internal/names/names.go +++ b/internal/names/names.go @@ -1,35 +1,49 @@ +// Package names generates display names for Telemost peers. package names import ( "bufio" - "math/rand/v2" + "crypto/rand" + _ "embed" + "fmt" + "math/big" "os" "strings" ) +//go:embed data/names +var embeddedNames string + +//go:embed data/surnames +var embeddedSurnames string + +//nolint:gochecknoglobals // Package-level state keeps the loaded name dictionaries cached for the process lifetime. var ( - firstNames []string - lastNames []string + firstNames = parseEmbedded(embeddedNames) + lastNames = parseEmbedded(embeddedSurnames) ) -var defaultFirstNames = []string{ - "Александр", "Дмитрий", "Максим", "Сергей", "Андрей", "Алексей", "Артём", "Илья", "Кирилл", "Михаил", - "Никита", "Матвей", "Роман", "Егор", "Арсений", "Иван", "Денис", "Евгений", "Даниил", "Тимофей", - "Владислав", "Игорь", "Владимир", "Павел", "Руслан", "Марк", "Константин", "Николай", "Олег", "Виктор", -} +func parseEmbedded(raw string) []string { + var names []string + for _, line := range strings.Split(raw, "\n") { + line = strings.TrimSpace(line) + if line != "" { + names = append(names, line) + } + } -var defaultLastNames = []string{ - "Иванов", "Смирнов", "Кузнецов", "Попов", "Васильев", "Петров", "Соколов", "Михайлов", "Новиков", "Фёдоров", - "Морозов", "Волков", "Алексеев", "Лебедев", "Семёнов", "Егоров", "Павлов", "Козлов", "Степанов", "Николаев", - "Орлов", "Андреев", "Макаров", "Никитин", "Захаров", "Зайцев", "Соловьёв", "Борисов", "Яковлев", "Григорьев", + return names } func loadNames(path string) ([]string, error) { + //nolint:gosec // Paths come from local CLI/runtime configuration; loading override files is intentional here. file, err := os.Open(path) if err != nil { - return nil, err + return nil, fmt.Errorf("open names file %q: %w", path, err) } - defer file.Close() + defer func() { + _ = file.Close() + }() var names []string scanner := bufio.NewScanner(file) @@ -40,27 +54,44 @@ func loadNames(path string) ([]string, error) { } } - return names, scanner.Err() + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("scan names file %q: %w", path, err) + } + + return names, nil } +// LoadNameFiles overrides embedded name dictionaries from local files when they are present. func LoadNameFiles(firstPath, lastPath string) error { - firstNames = defaultFirstNames - lastNames = defaultLastNames - - if names, err := loadNames(firstPath); err == nil { + if names, err := loadNames(firstPath); err == nil && len(names) > 0 { firstNames = names } - if names, err := loadNames(lastPath); err == nil { + if names, err := loadNames(lastPath); err == nil && len(names) > 0 { lastNames = names } return nil } +// Generate returns a random display name assembled from the currently loaded dictionaries. func Generate() string { - first := firstNames[rand.IntN(len(firstNames))] - last := lastNames[rand.IntN(len(lastNames))] + if len(firstNames) == 0 || len(lastNames) == 0 { + return "anonymous user" + } - return first + " " + last + return firstNames[randomIndex(len(firstNames))] + " " + lastNames[randomIndex(len(lastNames))] +} + +func randomIndex(limit int) int { + if limit <= 1 { + return 0 + } + + n, err := rand.Int(rand.Reader, big.NewInt(int64(limit))) + if err != nil { + return 0 + } + + return int(n.Int64()) } diff --git a/internal/protect/protect.go b/internal/protect/protect.go new file mode 100644 index 0000000..cf29f91 --- /dev/null +++ b/internal/protect/protect.go @@ -0,0 +1,66 @@ +package protect + +import ( + "context" + "net" + "net/http" + "syscall" + "time" +) + +// Protector is called with a socket file descriptor before connect. +// On Android, this calls VpnService.protect(fd) to bypass VPN routing. +var Protector func(fd int) bool + +func controlFunc(network, address string, c syscall.RawConn) error { + if Protector == nil { + return nil + } + var err error + c.Control(func(fd uintptr) { + if !Protector(int(fd)) { + err = &net.OpError{Op: "protect", Net: network, Err: net.ErrClosed} + } + }) + return err +} + +// NewDialer returns a net.Dialer that calls Protector on each new socket. +func NewDialer() *net.Dialer { + return &net.Dialer{ + Timeout: 10 * time.Second, + KeepAlive: 30 * time.Second, + Control: controlFunc, + } +} + +// NewHTTPClient returns an http.Client using protected sockets. +func NewHTTPClient() *http.Client { + dialer := NewDialer() + transport := &http.Transport{ + DialContext: dialer.DialContext, + ForceAttemptHTTP2: true, + MaxIdleConns: 10, + IdleConnTimeout: 30 * time.Second, + TLSHandshakeTimeout: 10 * time.Second, + ResponseHeaderTimeout: 10 * time.Second, + } + return &http.Client{Transport: transport} +} + +// DialContext dials using a protected socket. +func DialContext(ctx context.Context, network, address string) (net.Conn, error) { + return NewDialer().DialContext(ctx, network, address) +} + +// proxyDialer implements golang.org/x/net/proxy.Dialer for pion ICE. +type proxyDialer struct{} + +func (d *proxyDialer) Dial(network, addr string) (net.Conn, error) { + return NewDialer().Dial(network, addr) +} + +// NewProxyDialer returns a proxy.Dialer that protects ICE sockets. +func NewProxyDialer() *proxyDialer { + return &proxyDialer{} +} diff --git a/internal/server/server.go b/internal/server/server.go index 881ea12..e4222e4 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -3,7 +3,6 @@ package server import ( "context" "crypto/rand" - "encoding/binary" "encoding/hex" "encoding/json" "fmt" @@ -14,12 +13,12 @@ import ( "sync/atomic" "time" - "github.com/pion/webrtc/v4" "github.com/openlibrecommunity/olcrtc/internal/crypto" "github.com/openlibrecommunity/olcrtc/internal/logger" "github.com/openlibrecommunity/olcrtc/internal/mux" "github.com/openlibrecommunity/olcrtc/internal/names" "github.com/openlibrecommunity/olcrtc/internal/telemost" + "github.com/pion/webrtc/v4" ) type Server struct { @@ -28,6 +27,8 @@ type Server struct { mux *mux.Multiplexer connections map[uint16]net.Conn connMu sync.RWMutex + streamPumps map[uint16]net.Conn + pumpMu sync.Mutex peerIdx atomic.Uint32 wg sync.WaitGroup dnsServer string @@ -44,6 +45,8 @@ type ConnectRequest struct { } func Run(ctx context.Context, roomURL, keyHex string, duo bool, dnsServer, socksProxyAddr string, socksProxyPort int) error { + runCtx, cancel := context.WithCancel(ctx) + defer cancel() var key []byte var err error @@ -76,6 +79,7 @@ func Run(ctx context.Context, roomURL, keyHex string, duo bool, dnsServer, socks s := &Server{ cipher: cipher, connections: make(map[uint16]net.Conn), + streamPumps: make(map[uint16]net.Conn), peers: make([]*telemost.Peer, 0), dnsServer: dnsServer, socksProxyAddr: socksProxyAddr, @@ -124,14 +128,23 @@ func Run(ctx context.Context, roomURL, keyHex string, duo bool, dnsServer, socks }) for i := 0; i < peerCount; i++ { + peerID := i peer, err := telemost.NewPeer(roomURL, names.Generate(), s.onData) if err != nil { return err } + peer.SetEndedCallback(func(reason string) { + log.Printf("Server peer %d reported conference end: %s", peerID, reason) + cancel() + }) s.peers = append(s.peers, peer) peer.SetReconnectCallback(func(dc *webrtc.DataChannel) { - log.Printf("Server peer %d reconnected - resetting multiplexer state", i) + if dc == nil { + log.Printf("Server peer %d channel closed - resetting multiplexer state", peerID) + } else { + log.Printf("Server peer %d reconnected - resetting multiplexer state", peerID) + } s.connMu.Lock() for sid, conn := range s.connections { @@ -158,20 +171,20 @@ func Run(ctx context.Context, roomURL, keyHex string, duo bool, dnsServer, socks log.Println("Server multiplexer reset complete") }) - log.Printf("Connecting peer %d to Telemost...", i) - if err := peer.Connect(ctx); err != nil { + log.Printf("Connecting peer %d to Telemost...", peerID) + if err := peer.Connect(runCtx); err != nil { return err } - log.Printf("Peer %d connected", i) + log.Printf("Peer %d connected", peerID) s.wg.Add(1) go func() { defer s.wg.Done() - peer.WatchConnection(ctx) + peer.WatchConnection(runCtx) }() } - err = s.run(ctx) + err = s.run(runCtx) log.Println("Waiting for server goroutines...") s.wg.Wait() @@ -220,49 +233,29 @@ func (s *Server) onData(data []byte) { return } - if len(plaintext) >= 12 { - clientID := binary.BigEndian.Uint32(plaintext[0:4]) - sid := binary.BigEndian.Uint16(plaintext[4:6]) - length := binary.BigEndian.Uint16(plaintext[6:8]) - - if sid == 0xFFFF && length == 0xFFFF { - log.Printf("Received reset signal from client (clientID=%d) - cleaning up", clientID) - s.connMu.Lock() - for streamSid, conn := range s.connections { - stream := s.mux.GetStream(streamSid) - if stream != nil && stream.ClientID == clientID { - if conn != nil { - conn.Close() - } - delete(s.connections, streamSid) - } - } - s.connMu.Unlock() - } - } else if len(plaintext) >= 8 { - clientID := binary.BigEndian.Uint32(plaintext[0:4]) - sid := binary.BigEndian.Uint16(plaintext[4:6]) - length := binary.BigEndian.Uint16(plaintext[6:8]) - - if sid == 0xFFFF && length == 0xFFFF { - log.Printf("Received reset signal from client (clientID=%d) - cleaning up", clientID) - s.connMu.Lock() - for streamSid, conn := range s.connections { - stream := s.mux.GetStream(streamSid) - if stream != nil && stream.ClientID == clientID { - if conn != nil { - conn.Close() - } - delete(s.connections, streamSid) - } - } - s.connMu.Unlock() - } + if control, ok := mux.ParseControlFrame(plaintext); ok && control.Type == mux.ControlResetClient { + log.Printf("Received reset signal from client (clientID=%d) - cleaning up", control.ClientID) + s.closeClientConnections(control.ClientID) } s.mux.HandleFrame(plaintext) } +func (s *Server) closeClientConnections(clientID uint32) { + s.connMu.Lock() + defer s.connMu.Unlock() + + for streamSid, conn := range s.connections { + stream := s.mux.GetStream(streamSid) + if stream != nil && stream.ClientID == clientID { + if conn != nil { + conn.Close() + } + delete(s.connections, streamSid) + } + } +} + func (s *Server) run(ctx context.Context) error { ticker := time.NewTicker(10 * time.Millisecond) defer ticker.Stop() @@ -290,54 +283,81 @@ func (s *Server) run(ctx context.Context) error { case <-ticker.C: } - sids := s.mux.GetStreams() for _, sid := range sids { - go func(sid uint16) { - data := s.mux.ReadStream(sid) - if len(data) > 0 { - s.connMu.RLock() - conn, exists := s.connections[sid] - s.connMu.RUnlock() + if s.mux.StreamClosed(sid) { + s.closeStreamConnection(sid) + continue + } - if exists && conn != nil { - if _, err := conn.Write(data); err != nil { - s.mux.CloseStream(sid) - conn.Close() - s.connMu.Lock() - delete(s.connections, sid) - s.connMu.Unlock() - } - } else { - var req ConnectRequest - if err := json.Unmarshal(data, &req); err == nil && req.Cmd == "connect" { - log.Printf("[SERVER] sid=%d RECEIVED_CONNECT_REQUEST %s:%d", sid, req.Addr, req.Port) - s.connMu.Lock() - if oldConn, exists := s.connections[sid]; exists && oldConn != nil { - oldConn.Close() - } - s.connMu.Unlock() - go s.handleConnect(sid, req) - } - } - } + if s.hasConnection(sid) { + continue + } - if s.mux.StreamClosed(sid) { - s.connMu.Lock() - conn, exists := s.connections[sid] - if exists && conn != nil { - conn.Close() - delete(s.connections, sid) - } - s.connMu.Unlock() - } - }(sid) + data := s.mux.ReadStream(sid) + if len(data) == 0 { + continue + } + + var req ConnectRequest + if err := json.Unmarshal(data, &req); err == nil && req.Cmd == "connect" { + log.Printf("[SERVER] sid=%d RECEIVED_CONNECT_REQUEST %s:%d", sid, req.Addr, req.Port) + s.closeStreamConnection(sid) + go s.handleConnect(ctx, sid, req) + } } } } -func (s *Server) handleConnect(sid uint16, req ConnectRequest) { +func (s *Server) hasConnection(sid uint16) bool { + s.connMu.RLock() + defer s.connMu.RUnlock() + conn := s.connections[sid] + return conn != nil +} + +func (s *Server) closeStreamConnection(sid uint16) { + s.connMu.Lock() + conn := s.connections[sid] + if conn != nil { + conn.Close() + delete(s.connections, sid) + } + s.connMu.Unlock() +} + +func (s *Server) closeStreamConnectionIfCurrent(sid uint16, expected net.Conn) { + s.connMu.Lock() + conn := s.connections[sid] + if conn == expected { + conn.Close() + delete(s.connections, sid) + } + s.connMu.Unlock() +} + +func (s *Server) markStreamPump(sid uint16, conn net.Conn) bool { + s.pumpMu.Lock() + defer s.pumpMu.Unlock() + if current := s.streamPumps[sid]; current == conn { + return false + } else if current != nil { + current.Close() + } + s.streamPumps[sid] = conn + return true +} + +func (s *Server) unmarkStreamPump(sid uint16, conn net.Conn) { + s.pumpMu.Lock() + if s.streamPumps[sid] == conn { + delete(s.streamPumps, sid) + } + s.pumpMu.Unlock() +} + +func (s *Server) handleConnect(ctx context.Context, sid uint16, req ConnectRequest) { startTime := time.Now() addr := fmt.Sprintf("%s:%d", req.Addr, req.Port) logger.Verbose("Handling connect request sid=%d to %s", sid, addr) @@ -379,7 +399,6 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) { } logger.Verbose("SOCKS5 proxy dial took %v for sid=%d", time.Since(dialStart), sid) } - dialElapsed := time.Since(dialStart) if err != nil { @@ -388,6 +407,7 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) { return } + logger.Verbose("TCP dial took %v for sid=%d", dialElapsed, sid) s.connMu.Lock() s.connections[sid] = conn s.connMu.Unlock() @@ -395,6 +415,7 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) { log.Printf("[SERVER] sid=%d CONNECT_SUCCESS dial_time=%v", sid, dialElapsed) s.mux.SendData(sid, []byte{0x00}) + s.startStreamPump(ctx, sid, conn) go func() { defer func() { @@ -434,6 +455,41 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) { }() } +func (s *Server) startStreamPump(ctx context.Context, sid uint16, conn net.Conn) { + if !s.markStreamPump(sid, conn) { + return + } + + s.wg.Add(1) + go func() { + defer s.wg.Done() + defer s.unmarkStreamPump(sid, conn) + + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + data := s.mux.ReadStream(sid) + if len(data) > 0 { + if _, err := conn.Write(data); err != nil { + s.mux.CloseStream(sid) + s.closeStreamConnectionIfCurrent(sid, conn) + return + } + } + if s.mux.StreamClosed(sid) { + s.closeStreamConnectionIfCurrent(sid, conn) + return + } + } + } + }() +} + func (s *Server) canSendData() bool { for _, peer := range s.peers { if !peer.CanSend() { diff --git a/internal/telemost/api.go b/internal/telemost/api.go index 3c02a74..91e0d18 100644 --- a/internal/telemost/api.go +++ b/internal/telemost/api.go @@ -8,6 +8,7 @@ import ( "net/url" "github.com/google/uuid" + "github.com/openlibrecommunity/olcrtc/internal/protect" ) const apiBase = "https://cloud-api.yandex.ru/telemost_front/v2/telemost" @@ -44,7 +45,8 @@ func GetConnectionInfo(roomURL, displayName string) (*ConnectionInfo, error) { req.Header.Set("Origin", "https://telemost.yandex.ru") req.Header.Set("Referer", "https://telemost.yandex.ru/") - resp, err := http.DefaultClient.Do(req) + client := protect.NewHTTPClient() + resp, err := client.Do(req) if err != nil { return nil, err } diff --git a/internal/telemost/peer.go b/internal/telemost/peer.go index 578eb87..60b1073 100644 --- a/internal/telemost/peer.go +++ b/internal/telemost/peer.go @@ -1,9 +1,13 @@ package telemost import ( + "bytes" "context" + "encoding/json" "fmt" "log" + "math/rand/v2" + "net/http" "strings" "sync" "sync/atomic" @@ -11,9 +15,24 @@ import ( "github.com/google/uuid" "github.com/gorilla/websocket" + "github.com/openlibrecommunity/olcrtc/internal/logger" + "github.com/openlibrecommunity/olcrtc/internal/protect" "github.com/pion/webrtc/v4" ) +const ( + realDataChannelMessageLimit = 8192 + defaultSendDelayMin = 2 * time.Millisecond + defaultSendDelayMax = 12 * time.Millisecond + defaultTelemetryInterval = 20 * time.Second +) + +type TrafficShape struct { + MaxMessageSize int + MinDelay time.Duration + MaxDelay time.Duration +} + type Peer struct { roomURL string name string @@ -28,11 +47,21 @@ type Peer struct { reconnectCh chan struct{} closeCh chan struct{} keepAliveCh chan struct{} + telemetryCh chan struct{} lastReconnect time.Time reconnectCount int reconnectMu sync.Mutex + sessionMu sync.Mutex sendQueue chan []byte sendQueueClosed atomic.Bool + closed atomic.Bool + reconnecting atomic.Bool + telemetryActive atomic.Bool + ackMu sync.Mutex + ackWaiters map[string]chan struct{} + onEnded func(string) + trafficShape TrafficShape + sessionCloseCh chan struct{} wg sync.WaitGroup } @@ -47,6 +76,20 @@ func (p *Peer) GetBufferedAmount() uint64 { return 0 } +func (p *Peer) SetEndedCallback(cb func(string)) { + p.onEnded = cb +} + +func (p *Peer) SetTrafficShape(shape TrafficShape) { + if shape.MaxMessageSize <= 0 { + shape.MaxMessageSize = realDataChannelMessageLimit + } + if shape.MaxDelay < shape.MinDelay { + shape.MaxDelay = shape.MinDelay + } + p.trafficShape = shape +} + func NewPeer(roomURL, name string, onData func([]byte)) (*Peer, error) { conn, err := GetConnectionInfo(roomURL, name) if err != nil { @@ -54,18 +97,77 @@ func NewPeer(roomURL, name string, onData func([]byte)) (*Peer, error) { } return &Peer{ - roomURL: roomURL, - name: name, - conn: conn, - onData: onData, - reconnectCh: make(chan struct{}, 1), - closeCh: make(chan struct{}), - keepAliveCh: make(chan struct{}), - sendQueue: make(chan []byte, 5000), + roomURL: roomURL, + name: name, + conn: conn, + onData: onData, + reconnectCh: make(chan struct{}, 1), + closeCh: make(chan struct{}), + keepAliveCh: make(chan struct{}), + sessionCloseCh: make(chan struct{}), + telemetryCh: make(chan struct{}, 1), + sendQueue: make(chan []byte, 5000), + ackWaiters: make(map[string]chan struct{}), + trafficShape: TrafficShape{ + MaxMessageSize: realDataChannelMessageLimit, + MinDelay: defaultSendDelayMin, + MaxDelay: defaultSendDelayMax, + }, }, nil } +func closeSignal(ch chan struct{}) { + if ch == nil { + return + } + select { + case <-ch: + default: + close(ch) + } +} + +func (p *Peer) queueReconnect() { + if p.closed.Load() || p.reconnecting.Load() { + return + } + select { + case p.reconnectCh <- struct{}{}: + default: + } +} + +func (p *Peer) stopSession() { + p.stopTelemetry() + + p.sessionMu.Lock() + closeSignal(p.keepAliveCh) + closeSignal(p.sessionCloseCh) + p.sessionMu.Unlock() +} + +func (p *Peer) resetSession() (chan struct{}, chan struct{}) { + p.sessionMu.Lock() + defer p.sessionMu.Unlock() + + p.keepAliveCh = make(chan struct{}) + p.sessionCloseCh = make(chan struct{}) + return p.keepAliveCh, p.sessionCloseCh +} + +func (p *Peer) drainReconnectQueue() { + for { + select { + case <-p.reconnectCh: + default: + return + } + } +} + func (p *Peer) Connect(ctx context.Context) error { + p.closed.Store(false) + config := webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ {URLs: []string{"stun:stun.rtc.yandex.net:3478"}}, @@ -74,6 +176,9 @@ func (p *Peer) Connect(ctx context.Context) error { } settingEngine := webrtc.SettingEngine{} + if protect.Protector != nil { + settingEngine.SetICEProxyDialer(protect.NewProxyDialer()) + } api := webrtc.NewAPI(webrtc.WithSettingEngine(settingEngine)) var err error @@ -81,14 +186,11 @@ func (p *Peer) Connect(ctx context.Context) error { if err != nil { return err } - + p.pcSub.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { log.Printf("Subscriber PeerConnection state: %s", state.String()) - if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected { - select { - case p.reconnectCh <- struct{}{}: - default: - } + if !p.closed.Load() && (state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected) { + p.queueReconnect() } }) @@ -96,14 +198,11 @@ func (p *Peer) Connect(ctx context.Context) error { if err != nil { return err } - + p.pcPub.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { log.Printf("Publisher PeerConnection state: %s", state.String()) - if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected { - select { - case p.reconnectCh <- struct{}{}: - default: - } + if !p.closed.Load() && (state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected) { + p.queueReconnect() } }) @@ -113,36 +212,36 @@ func (p *Peer) Connect(ctx context.Context) error { } dcReady := make(chan struct{}) + keepAliveCh, sessionCloseCh := p.resetSession() p.dc.OnOpen(func() { log.Println("DataChannel opened") - + numWorkers := 4 for i := 0; i < numWorkers; i++ { p.wg.Add(1) go func(workerID int) { defer p.wg.Done() - p.processSendQueue(workerID) + p.processSendQueue(workerID, sessionCloseCh) }(i) } - + p.wg.Add(1) go func() { defer p.wg.Done() - p.monitorQueue() + p.monitorQueue(sessionCloseCh) }() - + close(dcReady) }) - + p.dc.OnClose(func() { log.Println("DataChannel closed") if p.onReconnect != nil { log.Println("Calling reconnect callback for cleanup") p.onReconnect(nil) } - select { - case p.reconnectCh <- struct{}{}: - default: + if !p.closed.Load() { + p.queueReconnect() } }) @@ -156,9 +255,8 @@ func (p *Peer) Connect(ctx context.Context) error { log.Printf("Received datachannel: %s", dc.Label()) dc.OnClose(func() { log.Println("Received DataChannel closed - triggering reconnect") - select { - case p.reconnectCh <- struct{}{}: - default: + if !p.closed.Load() { + p.queueReconnect() } }) dc.OnMessage(func(msg webrtc.DataChannelMessage) { @@ -168,7 +266,11 @@ func (p *Peer) Connect(ctx context.Context) error { }) }) - ws, _, err := websocket.DefaultDialer.Dial(p.conn.ClientConfig.MediaServerURL, nil) + wsDialer := websocket.Dialer{ + NetDialContext: protect.DialContext, + HandshakeTimeout: 15 * time.Second, + } + ws, _, err := wsDialer.Dial(p.conn.ClientConfig.MediaServerURL, nil) if err != nil { return err } @@ -178,13 +280,13 @@ func (p *Peer) Connect(ctx context.Context) error { ws.SetReadDeadline(time.Now().Add(60 * time.Second)) return nil }) - + ws.SetReadDeadline(time.Now().Add(60 * time.Second)) p.wg.Add(1) go func() { defer p.wg.Done() - p.keepAlive() + p.keepAlive(keepAliveCh) }() if err := p.sendHello(); err != nil { @@ -213,11 +315,11 @@ func (p *Peer) Send(data []byte) error { if p.dc == nil || p.dc.ReadyState() != webrtc.DataChannelStateOpen { return fmt.Errorf("datachannel not ready") } - + if p.sendQueueClosed.Load() { return fmt.Errorf("send queue closed") } - + select { case p.sendQueue <- data: return nil @@ -242,13 +344,13 @@ func (p *Peer) sendHello() error { "name": p.name, "role": "SPEAKER", }, - "sendAudio": false, - "sendVideo": false, - "sendSharing": false, - "participantId": p.conn.PeerID, - "roomId": p.conn.RoomID, - "serviceName": "telemost", - "credentials": p.conn.Credentials, + "sendAudio": false, + "sendVideo": false, + "sendSharing": false, + "participantId": p.conn.PeerID, + "roomId": p.conn.RoomID, + "serviceName": "telemost", + "credentials": p.conn.Credentials, "capabilitiesOffer": map[string]interface{}{ "offerAnswerMode": []string{"SEPARATE"}, "initialSubscriberOffer": []string{"ON_HELLO"}, @@ -280,13 +382,12 @@ func (p *Peer) handleSignaling() { var msg map[string]interface{} if err := p.ws.ReadJSON(&msg); err != nil { log.Printf("WS read error: %v", err) - select { - case p.reconnectCh <- struct{}{}: - default: + if !p.closed.Load() { + p.queueReconnect() } return } - + p.wsMu.Lock() if p.ws != nil { p.ws.SetReadDeadline(time.Now().Add(60 * time.Second)) @@ -295,7 +396,12 @@ func (p *Peer) handleSignaling() { uid, _ := msg["uid"].(string) - if _, ok := msg["serverHello"]; ok { + if _, ok := msg["ack"]; ok { + p.resolveAck(uid) + } + + if serverHello, ok := msg["serverHello"].(map[string]interface{}); ok { + p.startTelemetry(serverHello) p.sendAck(uid) } @@ -307,11 +413,16 @@ func (p *Peer) handleSignaling() { p.sendAck(uid) } + if isConferenceEndMessage(msg) { + p.signalEnded("conference ended") + return + } + if _, ok := msg["ping"]; ok { p.sendPong(uid) continue } - + if _, ok := msg["pong"]; ok { p.sendAck(uid) continue @@ -421,9 +532,13 @@ func (p *Peer) handleICE(cand map[string]interface{}) { } func (p *Peer) sendAck(uid string) { + if uid == "" { + return + } + p.wsMu.Lock() defer p.wsMu.Unlock() - + p.ws.WriteJSON(map[string]interface{}{ "uid": uid, "ack": map[string]interface{}{ @@ -434,16 +549,206 @@ func (p *Peer) sendAck(uid string) { }) } +func (p *Peer) registerAckWaiter(uid string) chan struct{} { + ch := make(chan struct{}) + p.ackMu.Lock() + p.ackWaiters[uid] = ch + p.ackMu.Unlock() + return ch +} + +func (p *Peer) removeAckWaiter(uid string) { + p.ackMu.Lock() + delete(p.ackWaiters, uid) + p.ackMu.Unlock() +} + +func (p *Peer) waitForAck(uid string, ch <-chan struct{}, timeout time.Duration) bool { + if uid == "" { + return false + } + + defer func() { + p.removeAckWaiter(uid) + }() + + select { + case <-ch: + return true + case <-time.After(timeout): + return false + case <-p.closeCh: + return false + } +} + +func (p *Peer) resolveAck(uid string) { + if uid == "" { + return + } + + p.ackMu.Lock() + ch := p.ackWaiters[uid] + if ch != nil { + delete(p.ackWaiters, uid) + close(ch) + } + p.ackMu.Unlock() +} + func (p *Peer) sendPong(uid string) { p.wsMu.Lock() defer p.wsMu.Unlock() - + p.ws.WriteJSON(map[string]interface{}{ - "uid": uid, + "uid": uid, "pong": map[string]interface{}{}, }) } +func (p *Peer) startTelemetry(serverHello map[string]interface{}) { + cfg, ok := serverHello["telemetryConfiguration"].(map[string]interface{}) + if !ok { + return + } + + endpoint, _ := cfg["logEndpoint"].(string) + if endpoint == "" { + endpoint, _ = cfg["endpoint"].(string) + } + if endpoint == "" { + endpoint, _ = cfg["url"].(string) + } + if endpoint == "" { + logger.Verbose("Telemetry configuration has no endpoint; skipping XHR simulation") + return + } + + interval := defaultTelemetryInterval + if raw, ok := cfg["sendingInterval"].(float64); ok && raw > 0 { + interval = time.Duration(raw) * time.Millisecond + } + + if !p.telemetryActive.CompareAndSwap(false, true) { + return + } + + p.wg.Add(1) + go func() { + defer p.wg.Done() + defer p.telemetryActive.Store(false) + + ticker := time.NewTicker(interval) + defer ticker.Stop() + + p.sendTelemetry(endpoint, "join") + for { + select { + case <-ticker.C: + p.sendTelemetry(endpoint, "stats") + case <-p.telemetryCh: + p.sendTelemetry(endpoint, "leave") + return + case <-p.closeCh: + p.sendTelemetry(endpoint, "leave") + return + } + } + }() +} + +func (p *Peer) stopTelemetry() { + if p.telemetryActive.Load() { + select { + case p.telemetryCh <- struct{}{}: + default: + } + } +} + +func (p *Peer) sendTelemetry(endpoint, event string) { + body, err := json.Marshal(map[string]interface{}{ + "event": event, + "timestamp": time.Now().UnixMilli(), + "peerId": p.conn.PeerID, + "roomId": p.conn.RoomID, + "displayName": p.name, + "implementation": "olcrtc-go", + "dataChannel": map[string]interface{}{ + "bufferedAmount": p.GetBufferedAmount(), + "sendQueue": len(p.sendQueue), + }, + }) + if err != nil { + return + } + + req, err := http.NewRequest(http.MethodPost, endpoint, bytes.NewReader(body)) + if err != nil { + logger.Verbose("Telemetry request skipped: %v", err) + return + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("User-Agent", "Mozilla/5.0 (X11; Linux x86_64; rv:149.0) Gecko/20100101 Firefox/149.0") + req.Header.Set("Origin", "https://telemost.yandex.ru") + req.Header.Set("Referer", p.roomURL) + req.Header.Set("X-Requested-With", "XMLHttpRequest") + req.Header.Set("Client-Instance-Id", uuid.New().String()) + req.Header.Set("X-Telemost-Client-Version", "187.1.0") + req.Header.Set("Idempotency-Key", uuid.New().String()) + + client := protect.NewHTTPClient() + resp, err := client.Do(req) + if err != nil { + logger.Verbose("Telemetry send failed: %v", err) + return + } + defer resp.Body.Close() + if resp.StatusCode >= 400 { + logger.Verbose("Telemetry endpoint returned %s", resp.Status) + } +} + +func (p *Peer) signalEnded(reason string) { + log.Printf("Conference ended: %s", reason) + p.closed.Store(true) + p.stopTelemetry() + if p.onEnded != nil { + p.onEnded(reason) + } +} + +func isConferenceEndMessage(msg map[string]interface{}) bool { + for _, key := range []string{"conferenceClosed", "conferenceEnded", "roomClosed", "roomEnded", "callEnded"} { + if _, ok := msg[key]; ok { + return true + } + } + + if raw, ok := msg["conference"].(map[string]interface{}); ok { + if state, _ := raw["state"].(string); isEndedState(state) { + return true + } + } + + if raw, ok := msg["conferenceState"].(map[string]interface{}); ok { + if state, _ := raw["state"].(string); isEndedState(state) { + return true + } + } + + return false +} + +func isEndedState(state string) bool { + switch strings.ToLower(state) { + case "closed", "ended", "finished", "terminated": + return true + default: + return false + } +} + func (p *Peer) setupICEHandlers() { p.pcSub.OnICECandidate(func(c *webrtc.ICECandidate) { if c == nil { @@ -455,11 +760,11 @@ func (p *Peer) setupICEHandlers() { p.ws.WriteJSON(map[string]interface{}{ "uid": uuid.New().String(), "webrtcIceCandidate": map[string]interface{}{ - "candidate": init.Candidate, - "sdpMid": init.SDPMid, + "candidate": init.Candidate, + "sdpMid": init.SDPMid, "sdpMlineIndex": init.SDPMLineIndex, - "target": "SUBSCRIBER", - "pcSeq": 1, + "target": "SUBSCRIBER", + "pcSeq": 1, }, }) p.wsMu.Unlock() @@ -475,48 +780,63 @@ func (p *Peer) setupICEHandlers() { p.ws.WriteJSON(map[string]interface{}{ "uid": uuid.New().String(), "webrtcIceCandidate": map[string]interface{}{ - "candidate": init.Candidate, - "sdpMid": init.SDPMid, + "candidate": init.Candidate, + "sdpMid": init.SDPMid, "sdpMlineIndex": init.SDPMLineIndex, - "target": "PUBLISHER", - "pcSeq": 1, + "target": "PUBLISHER", + "pcSeq": 1, }, }) p.wsMu.Unlock() }) } -func (p *Peer) sendLeave() { +func (p *Peer) sendLeave(uid string) bool { p.wsMu.Lock() defer p.wsMu.Unlock() - + if p.ws == nil { log.Println("WebSocket already closed, cannot send leave") - return + return false } - + leave := map[string]interface{}{ - "uid": uuid.New().String(), + "uid": uid, "leave": map[string]interface{}{}, } - + if err := p.ws.WriteJSON(leave); err != nil { log.Printf("Failed to send leave: %v", err) + return false } else { log.Println("Sent leave message to server") } + return true } func (p *Peer) Close() error { log.Println("Closing peer connection...") - + + alreadyClosing := p.closed.Swap(true) p.sendQueueClosed.Store(true) - - log.Println("Sending leave message...") - p.sendLeave() - - time.Sleep(1 * time.Second) - + + if !alreadyClosing { + log.Println("Sending leave message...") + leaveUID := uuid.New().String() + leaveAck := p.registerAckWaiter(leaveUID) + if p.sendLeave(leaveUID) { + if p.waitForAck(leaveUID, leaveAck, 1500*time.Millisecond) { + log.Println("Leave acknowledged") + } else { + log.Println("Leave ack timeout") + } + } else { + p.removeAckWaiter(leaveUID) + } + + p.stopTelemetry() + } + log.Println("Closing channels...") if p.closeCh != nil { select { @@ -525,36 +845,36 @@ func (p *Peer) Close() error { close(p.closeCh) } } - + log.Println("Waiting for goroutines...") done := make(chan struct{}) go func() { p.wg.Wait() close(done) }() - + select { case <-done: log.Println("All goroutines finished") case <-time.After(2 * time.Second): log.Println("Goroutine wait timeout") } - + if p.dc != nil { log.Println("Closing DataChannel...") p.dc.Close() } - + if p.pcPub != nil { log.Println("Closing Publisher PeerConnection...") p.pcPub.Close() } - + if p.pcSub != nil { log.Println("Closing Subscriber PeerConnection...") p.pcSub.Close() } - + if p.ws != nil { log.Println("Closing WebSocket...") p.wsMu.Lock() @@ -562,15 +882,15 @@ func (p *Peer) Close() error { p.ws.Close() p.wsMu.Unlock() } - + log.Println("Peer closed") return nil } -func (p *Peer) keepAlive() { +func (p *Peer) keepAlive(keepAliveCh <-chan struct{}) { wsPingTicker := time.NewTicker(30 * time.Second) defer wsPingTicker.Stop() - + appPingTicker := time.NewTicker(5 * time.Second) defer appPingTicker.Stop() @@ -582,10 +902,7 @@ func (p *Peer) keepAlive() { if err := p.ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)); err != nil { log.Printf("WS Ping error: %v", err) p.wsMu.Unlock() - select { - case p.reconnectCh <- struct{}{}: - default: - } + p.queueReconnect() return } } @@ -599,15 +916,12 @@ func (p *Peer) keepAlive() { }); err != nil { log.Printf("App Ping error: %v", err) p.wsMu.Unlock() - select { - case p.reconnectCh <- struct{}{}: - default: - } + p.queueReconnect() return } } p.wsMu.Unlock() - case <-p.keepAliveCh: + case <-keepAliveCh: return case <-p.closeCh: return @@ -617,49 +931,51 @@ func (p *Peer) keepAlive() { func (p *Peer) reconnect(ctx context.Context) error { log.Println("Reconnecting...") - - p.sendLeave() + p.reconnecting.Store(true) + defer p.reconnecting.Store(false) + + p.sendLeave(uuid.New().String()) time.Sleep(500 * time.Millisecond) - - close(p.keepAliveCh) - + + p.stopSession() + if p.dc != nil { p.dc.Close() } - + if p.pcPub != nil { p.pcPub.Close() } - + if p.pcSub != nil { p.pcSub.Close() } - + if p.ws != nil { p.wsMu.Lock() p.ws.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(time.Second)) p.ws.Close() p.wsMu.Unlock() } - + time.Sleep(3 * time.Second) - - p.keepAliveCh = make(chan struct{}) - + conn, err := GetConnectionInfo(p.roomURL, p.name) if err != nil { return err } p.conn = conn - + if err := p.Connect(ctx); err != nil { return err } - + if p.onReconnect != nil { p.onReconnect(p.dc) } - + + p.drainReconnectQueue() + return nil } @@ -670,7 +986,7 @@ func (p *Peer) SetReconnectCallback(cb func(*webrtc.DataChannel)) { func (p *Peer) WatchConnection(ctx context.Context) { const maxReconnects = 10 const reconnectWindow = 5 * time.Minute - + for { select { case <-p.reconnectCh: @@ -679,28 +995,31 @@ func (p *Peer) WatchConnection(ctx context.Context) { if now.Sub(p.lastReconnect) > reconnectWindow { p.reconnectCount = 0 } - + if p.reconnectCount >= maxReconnects { log.Printf("Max reconnect attempts (%d) reached, stopping", maxReconnects) p.reconnectMu.Unlock() return } - + p.reconnectCount++ p.lastReconnect = now p.reconnectMu.Unlock() - + backoff := time.Duration(p.reconnectCount) * 2 * time.Second if backoff > 30*time.Second { backoff = 30 * time.Second } - + for { if err := p.reconnect(ctx); err != nil { log.Printf("Reconnect failed: %v, retrying in %v...", err, backoff) time.Sleep(backoff) continue } + p.reconnectMu.Lock() + p.reconnectCount = 0 + p.reconnectMu.Unlock() log.Println("Reconnected successfully") break } @@ -712,48 +1031,76 @@ func (p *Peer) WatchConnection(ctx context.Context) { } } -func (p *Peer) processSendQueue(workerID int) { +func (p *Peer) processSendQueue(workerID int, sessionCloseCh <-chan struct{}) { log.Printf("[WORKER-%d] Started", workerID) defer log.Printf("[WORKER-%d] Stopped", workerID) - + for { select { - case data := <-p.sendQueue: + case data, ok := <-p.sendQueue: + if !ok { + return + } if p.dc == nil || p.dc.ReadyState() != webrtc.DataChannelStateOpen { continue } - - start := time.Now() - - for p.dc.BufferedAmount() > 4*1024*1024 { - time.Sleep(10 * time.Millisecond) - if time.Since(start) > 10*time.Second { - log.Printf("[WORKER-%d] Buffer wait timeout, dropping packet size=%d", workerID, len(data)) - break - } - } - - if time.Since(start) > 10*time.Second { + if p.trafficShape.MaxMessageSize > 0 && len(data) > p.trafficShape.MaxMessageSize { + log.Printf("[WORKER-%d] Refusing oversized DataChannel message size=%d limit=%d", workerID, len(data), p.trafficShape.MaxMessageSize) continue } - + if delay := p.nextSendDelay(); delay > 0 { + time.Sleep(delay) + } + + // Wait until SCTP buffer drains. Dropping here would corrupt the + // carried TCP streams (the mux is a reliable transport); large + // downloads like Instagram/Twitter assets would hang forever + // waiting for the missing bytes. Backpressure already propagates + // upstream via CanSend() / the sendQueue length. + // Threshold is high (4MB) because a tight limit serialises sends: + // workers would pause on every frame, turning throughput into + // one chunk per 10ms drain cycle (~400KB/s). + waitStart := time.Now() + for p.dc.BufferedAmount() > 4*1024*1024 { + if p.dc.ReadyState() != webrtc.DataChannelStateOpen { + break + } + time.Sleep(10 * time.Millisecond) + } + if waited := time.Since(waitStart); waited > 500*time.Millisecond { + logger.Verbose("[WORKER-%d] Buffer drained after %v", workerID, waited) + } + + if p.dc == nil || p.dc.ReadyState() != webrtc.DataChannelStateOpen { + continue + } + + sendStart := time.Now() if err := p.dc.Send(data); err != nil { log.Printf("[WORKER-%d] Send error: %v", workerID, err) - } else if time.Since(start) > 100*time.Millisecond { - log.Printf("[WORKER-%d] Sent %d bytes in %v (buffered: %d)", - workerID, len(data), time.Since(start), p.dc.BufferedAmount()) + } else { + elapsed := time.Since(sendStart) + if elapsed > 50*time.Millisecond { + log.Printf("[WORKER-%d] Sent %d bytes in %v (buffered: %d)", + workerID, len(data), elapsed, p.dc.BufferedAmount()) + } else { + logger.Verbose("[WORKER-%d] Sent %d bytes (buffered: %d)", + workerID, len(data), p.dc.BufferedAmount()) + } } - + + case <-sessionCloseCh: + return case <-p.closeCh: return } } } -func (p *Peer) monitorQueue() { +func (p *Peer) monitorQueue(sessionCloseCh <-chan struct{}) { ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() - + for { select { case <-ticker.C: @@ -765,6 +1112,8 @@ func (p *Peer) monitorQueue() { if queueLen > 800 || buffered > 3*1024*1024 { log.Printf("[QUEUE_MONITOR] queue_len=%d dc_buffered=%d MB", queueLen, buffered/(1024*1024)) } + case <-sessionCloseCh: + return case <-p.closeCh: return } @@ -779,3 +1128,15 @@ func (p *Peer) CanSend() bool { } return queueLen < 1000 && buffered < 3*1024*1024 } + +func (p *Peer) nextSendDelay() time.Duration { + minDelay := p.trafficShape.MinDelay + maxDelay := p.trafficShape.MaxDelay + if maxDelay <= 0 { + return 0 + } + if maxDelay <= minDelay { + return maxDelay + } + return minDelay + time.Duration(rand.Int64N(int64(maxDelay-minDelay))) +} diff --git a/mobile/mobile.go b/mobile/mobile.go new file mode 100644 index 0000000..e60cac7 --- /dev/null +++ b/mobile/mobile.go @@ -0,0 +1,220 @@ +// Package mobile provides a gomobile-compatible API for olcRTC. +// Build with: gomobile bind -target=android ./mobile +package mobile + +import ( + "context" + "errors" + "log" + "sync" + "time" + + "github.com/openlibrecommunity/olcrtc/internal/client" + "github.com/openlibrecommunity/olcrtc/internal/logger" + "github.com/openlibrecommunity/olcrtc/internal/protect" +) + +// SocketProtector protects sockets from VPN routing on Android. +// Implement this interface in Kotlin/Java and pass to SetProtector. +type SocketProtector interface { + Protect(fd int) bool +} + +// LogWriter receives log messages from olcRTC. +type LogWriter interface { + WriteLog(msg string) +} + +var ( + errAlreadyRunning = errors.New("olcRTC already running") + errRoomIDRequired = errors.New("roomID is required") + errKeyHexRequired = errors.New("keyHex is required") + errNotRunning = errors.New("olcRTC is not running") + errStoppedBeforeReady = errors.New("olcRTC stopped before becoming ready") + errStartTimedOut = errors.New("olcRTC start timed out") +) + +//nolint:gochecknoglobals // Mobile bindings expose a singleton runtime controlled by the embedding app. +var ( + mu sync.Mutex + cancel context.CancelFunc + done chan struct{} + ready chan struct{} + errRun error +) + +// SetProtector sets the Android VPN socket protector. +// Must be called before Start. +func SetProtector(p SocketProtector) { + if p == nil { + protect.Protector = nil + return + } + protect.Protector = func(fd int) bool { + return p.Protect(fd) + } +} + +// SetLogWriter sets a custom log writer for olcRTC output. +func SetLogWriter(w LogWriter) { + if w != nil { + log.SetOutput(&logBridge{w: w}) + } +} + +// SetDebug enables or disables verbose logging. +func SetDebug(enabled bool) { + logger.SetVerbose(enabled) + if enabled { + log.SetFlags(log.Ltime | log.Lshortfile) + return + } + + log.SetFlags(log.Ltime) +} + +// Start launches the olcRTC client in background. +// roomID: Telemost room ID (e.g. "xxx-xxx-xxx") +// keyHex: 64-char hex encryption key +// socksPort: local SOCKS5 proxy port (e.g. 10808) +// duo: use dual channels for higher throughput +// socksUser/socksPass: SOCKS5 credentials (empty = no auth). +func Start(roomID, keyHex string, socksPort int, duo bool, socksUser, socksPass string) error { + mu.Lock() + defer mu.Unlock() + + switch { + case cancel != nil: + return errAlreadyRunning + case roomID == "": + return errRoomIDRequired + case keyHex == "": + return errKeyHexRequired + } + + roomURL := "https://telemost.yandex.ru/j/" + roomID + + ctx, cancelFunc := context.WithCancel(context.Background()) + cancel = cancelFunc + done = make(chan struct{}) + ready = make(chan struct{}) + localReady := ready + errRun = nil + + var readyOnce sync.Once + go func() { + defer cancelFunc() + + err := client.RunWithReady( + ctx, + roomURL, + keyHex, + socksPort, + duo, + "", + socksUser, + socksPass, + func() { + readyOnce.Do(func() { + close(localReady) + }) + }, + ) + + mu.Lock() + cancel = nil + errRun = err + mu.Unlock() + close(done) + }() + + return nil +} + +// WaitReady blocks until the Telemost peers are connected and the local SOCKS5 listener is ready. +// +//nolint:cyclop // The control flow is intentionally linear so mobile callers can observe each startup state clearly. +func WaitReady(timeoutMillis int) error { + mu.Lock() + r := ready + d := done + runErr := errRun + running := cancel != nil + mu.Unlock() + + if r == nil { + if runErr != nil { + return runErr + } + + return errNotRunning + } + + select { + case <-r: + return nil + default: + } + + if !running { + if runErr != nil { + return runErr + } + + return errStoppedBeforeReady + } + + timer := time.NewTimer(time.Duration(timeoutMillis) * time.Millisecond) + defer timer.Stop() + + select { + case <-r: + return nil + case <-d: + mu.Lock() + runErr = errRun + mu.Unlock() + if runErr != nil { + return runErr + } + + return errStoppedBeforeReady + case <-timer.C: + return errStartTimedOut + } +} + +// Stop gracefully stops the olcRTC client. +func Stop() { + mu.Lock() + cancelFunc := cancel + doneCh := done + mu.Unlock() + + if cancelFunc == nil { + return + } + + cancelFunc() + + if doneCh != nil { + <-doneCh + } +} + +// IsRunning returns true if the olcRTC client is active. +func IsRunning() bool { + mu.Lock() + defer mu.Unlock() + return cancel != nil +} + +// logBridge adapts LogWriter to io.Writer for log package. +type logBridge struct { + w LogWriter +} + +func (b *logBridge) Write(p []byte) (int, error) { + b.w.WriteLog(string(p)) + return len(p), nil +} diff --git a/script/docker/olcrtc-entrypoint.sh b/script/docker/olcrtc-entrypoint.sh new file mode 100644 index 0000000..bce7aa5 --- /dev/null +++ b/script/docker/olcrtc-entrypoint.sh @@ -0,0 +1,79 @@ +#!/bin/sh +set -eu + +die() { + echo "olcrtc-entrypoint: $*" >&2 + exit 1 +} + +bool_flag() { + case "${1:-}" in + 1|true|TRUE|yes|YES|on|ON) return 0 ;; + *) return 1 ;; + esac +} + +make_key() { + if command -v od >/dev/null 2>&1; then + od -An -N32 -tx1 /dev/urandom | tr -d ' \n' + else + hexdump -n 32 -e '32/1 "%02x"' /dev/urandom + fi +} + +if [ "${1:-}" = "olcrtc" ]; then + shift +fi + +if [ "$#" -gt 0 ]; then + exec /usr/local/bin/olcrtc "$@" +fi + +mode="${OLCRTC_MODE:-srv}" +room_id="${OLCRTC_ROOM_ID:-${ROOM_ID:-}}" +provider="${OLCRTC_PROVIDER:-telemost}" +data_dir="${OLCRTC_DATA_DIR:-/usr/share/olcrtc}" +dns_server="${OLCRTC_DNS:-1.1.1.1:53}" +key="${OLCRTC_KEY:-${KEY:-}}" +key_file="${OLCRTC_KEY_FILE:-/var/lib/olcrtc/key.hex}" + +[ "$mode" = "srv" ] || die "server image defaults to OLCRTC_MODE=srv; got '$mode'" +[ -n "$room_id" ] || die "set OLCRTC_ROOM_ID to the Telemost room id" + +if [ -z "$key" ]; then + if [ -s "$key_file" ]; then + key="$(tr -d '[:space:]' < "$key_file")" + else + key="$(make_key)" + umask 077 + printf '%s\n' "$key" > "$key_file" + echo "olcrtc-entrypoint: generated encryption key and saved it to $key_file" >&2 + echo "olcrtc-entrypoint: OLCRTC_KEY=$key" >&2 + fi +fi + +case "$key" in + *[!0-9a-fA-F]*) + die "OLCRTC_KEY must be a 64-character hex string" + ;; +esac + +[ "${#key}" -eq 64 ] || die "OLCRTC_KEY must be 64 hex characters" + +set -- /usr/local/bin/olcrtc \ + -mode "$mode" \ + -provider "$provider" \ + -id "$room_id" \ + -key "$key" \ + -data "$data_dir" \ + -dns "$dns_server" + +if bool_flag "${OLCRTC_DUO:-}"; then + set -- "$@" -duo +fi + +if bool_flag "${OLCRTC_DEBUG:-}"; then + set -- "$@" -debug +fi + +exec "$@" diff --git a/script/docker/olcrtc-healthcheck.sh b/script/docker/olcrtc-healthcheck.sh new file mode 100644 index 0000000..e21e47e --- /dev/null +++ b/script/docker/olcrtc-healthcheck.sh @@ -0,0 +1,8 @@ +#!/bin/sh +set -eu + +exe="$(readlink /proc/1/exe 2>/dev/null || true)" +case "$exe" in + */olcrtc) exit 0 ;; + *) exit 1 ;; +esac