diff --git a/.gitignore b/.gitignore index 61fcb93..e6d1938 100644 --- a/.gitignore +++ b/.gitignore @@ -238,15 +238,12 @@ profile.cov go.work go.work.sum -# env file +# files .env - -# Editor/IDE -# .idea/ -# .vscode/ build/ GEMINI.md code/package-lock.json !cmd/olcrtc/ !cmd/olcrtc/main_test.go !pkg/ +cpu.prof diff --git a/code/secretny_ddoos.py b/code/secretny_ddoos.py deleted file mode 100755 index 2fe92be..0000000 --- a/code/secretny_ddoos.py +++ /dev/null @@ -1,154 +0,0 @@ -#!/usr/bin/env python3 -import asyncio -import re -import time - -import requests - -API_BASE = "https://stream.wb.ru" -OUTPUT_FILE = "/tmp/ti_ymresh_ot_spida.txt" -HITS_FILE = "/tmp/ti_ymresh_v_mukah.txt" - -PATTERNS = [ - re.compile(r"dead", re.IGNORECASE), - re.compile(r"beef", re.IGNORECASE), - re.compile(r"deadbeef", re.IGNORECASE), -] - -CONCURRENCY = 50 -TOTAL_ATTEMPTS = 0 -PRINT_EVERY = 100 - - -def _create_room_sync(idx: int) -> str | None: - headers = { - "User-Agent": "Mozilla/5.0 (Linux x86_64)", - "Content-Type": "application/json", - } - try: - reg = requests.post( - f"{API_BASE}/auth/api/v1/auth/user/guest-register", - json={ - "displayName": f"OlcRTC-DDoos-{idx}", - "device": { - "deviceName": "Linux", - "deviceType": "PARTICIPANT_DEVICE_TYPE_WEB_DESKTOP", - }, - }, - headers=headers, - timeout=15, - ) - reg.raise_for_status() - headers["Authorization"] = f"Bearer {reg.json()['accessToken']}" - - room_req = requests.post( - f"{API_BASE}/api-room/api/v2/room", - json={ - "roomType": "ROOM_TYPE_ALL_ON_SCREEN", - "roomPrivacy": "ROOM_PRIVACY_FREE", - }, - headers=headers, - timeout=15, - ) - room_req.raise_for_status() - return room_req.json()["roomId"] - except Exception: - return None - - -def _check_hit(room_id: str) -> str | None: - best = None - for p in PATTERNS: - if p.search(room_id): - if p.pattern.lower() == "deadbeef": - return "DEADBEEF-JP" - best = p.pattern - return best - - -class Stats: - __slots__ = ("attempts", "ok", "fail", "hits", "started") - - def __init__(self) -> None: - self.attempts = 0 - self.ok = 0 - self.fail = 0 - self.hits = 0 - self.started = time.time() - - -async def worker(sem: asyncio.Semaphore, stats: Stats, idx: int) -> None: - async with sem: - loop = asyncio.get_running_loop() - room_id = await loop.run_in_executor(None, _create_room_sync, idx) - stats.attempts += 1 - - if not room_id: - stats.fail += 1 - else: - stats.ok += 1 - with open(OUTPUT_FILE, "a", encoding="utf-8") as f: - f.write(room_id + "\n") - - hit = _check_hit(room_id) - if hit: - stats.hits += 1 - line = f"[{hit}] {room_id}" - print(f"\n!!! HIT !!! {line}\n") - with open(HITS_FILE, "a", encoding="utf-8") as f: - f.write(line + "\n") - - if stats.attempts % PRINT_EVERY == 0: - elapsed = time.time() - stats.started - rps = stats.attempts / elapsed if elapsed else 0 - print( - f"[{stats.attempts}] ok={stats.ok} fail={stats.fail} " - f"hits={stats.hits} rps={rps:.1f}" - ) - - -async def main() -> None: - print("--- imba: DEADBEEF ---") - print(f"all room -> {OUTPUT_FILE}") - print(f"heat dead/beef-> {HITS_FILE}") - print(f"parralel : {CONCURRENCY}") - print(f"limit : {'∞' if TOTAL_ATTEMPTS == 0 else TOTAL_ATTEMPTS}") - print() - - sem = asyncio.Semaphore(CONCURRENCY) - stats = Stats() - idx = 0 - - try: - if TOTAL_ATTEMPTS > 0: - tasks = [ - asyncio.create_task(worker(sem, stats, i)) - for i in range(1, TOTAL_ATTEMPTS + 1) - ] - await asyncio.gather(*tasks) - else: - running: set[asyncio.Task] = set() - while True: - while len(running) < CONCURRENCY * 2: - idx += 1 - running.add(asyncio.create_task(worker(sem, stats, idx))) - done, running = await asyncio.wait( - running, return_when=asyncio.FIRST_COMPLETED - ) - except KeyboardInterrupt: - pass - finally: - elapsed = time.time() - stats.started - print("\n--- ITOGY ---") - print(f"runs : {stats.attempts}") - print(f"OK : {stats.ok}") - print(f"FAIL : {stats.fail}") - print(f"heatS : {stats.hits}") - print(f"timw : {elapsed:.1f}s") - - -if __name__ == "__main__": - try: - asyncio.run(main()) - except KeyboardInterrupt: - pass diff --git a/docs/about.md b/docs/about.md index 4254443..f415d20 100644 --- a/docs/about.md +++ b/docs/about.md @@ -1,3 +1,14 @@ +
+ + + +![License](https://img.shields.io/badge/license-WTFPL-0D1117?style=flat-square&logo=open-source-initiative&logoColor=green&labelColor=0D1117) +![Golang](https://img.shields.io/badge/-Golang-0D1117?style=flat-square&logo=go&logoColor=00A7D0) + +
+ + + # olcRTC - общее описание `olcRTC` (OpenLibreCommunity RTC) - зашифрованный TCP-over-WebRTC туннель. Он маскирует трафик под обычное участие в WebRTC/SFU-сервисе: Jitsi Meet, Yandex Telemost или WbStream. @@ -174,8 +185,6 @@ data: data | `script` | интерактивные launchers и Docker entrypoint | | `docs` | документация и примеры YAML | -Подробная карта для разработки: [project-map.md](project-map.md). - ## Сборка ```bash @@ -244,7 +253,7 @@ mage e2e Real-provider E2E включаются через переменные: ```bash -E2E_CARRIERS=wbstream E2E_TRANSPORTS=vp8channel mage e2e +E2E_CARRIERS=wbstream E2E_TRANSPORTS= vp8channel mage e2e ``` ## Частые проблемы diff --git a/docs/configuration.md b/docs/configuration.md index c2b373f..2090e3e 100644 --- a/docs/configuration.md +++ b/docs/configuration.md @@ -1,3 +1,13 @@ +
+ + + +![License](https://img.shields.io/badge/license-WTFPL-0D1117?style=flat-square&logo=open-source-initiative&logoColor=green&labelColor=0D1117) +![Golang](https://img.shields.io/badge/-Golang-0D1117?style=flat-square&logo=go&logoColor=00A7D0) + +
+ + # Настройка YAML `olcrtc` читает runtime-настройки из одного YAML-файла. CLI принимает ровно один аргумент - путь к конфигу; отдельных CLI-флагов для режима, транспорта и провайдера больше нет. @@ -9,31 +19,31 @@ olcrtc /etc/olcrtc/client.yaml Готовые примеры: -- [`server.jitsi.datachannel.yaml`](./examples/server.jitsi.datachannel.yaml) -- [`client.jitsi.datachannel.yaml`](./examples/client.jitsi.datachannel.yaml) -- [`server.jitsi.videochannel.yaml`](./examples/server.jitsi.videochannel.yaml) -- [`client.jitsi.videochannel.yaml`](./examples/client.jitsi.videochannel.yaml) -- [`server.jitsi.seichannel.yaml`](./examples/server.jitsi.seichannel.yaml) -- [`client.jitsi.seichannel.yaml`](./examples/client.jitsi.seichannel.yaml) -- [`server.jitsi.vp8channel.yaml`](./examples/server.jitsi.vp8channel.yaml) -- [`client.jitsi.vp8channel.yaml`](./examples/client.jitsi.vp8channel.yaml) -- [`server.telemost.datachannel.yaml`](./examples/server.telemost.datachannel.yaml) -- [`client.telemost.datachannel.yaml`](./examples/client.telemost.datachannel.yaml) -- [`server.telemost.videochannel.yaml`](./examples/server.telemost.videochannel.yaml) -- [`client.telemost.videochannel.yaml`](./examples/client.telemost.videochannel.yaml) -- [`server.telemost.seichannel.yaml`](./examples/server.telemost.seichannel.yaml) -- [`client.telemost.seichannel.yaml`](./examples/client.telemost.seichannel.yaml) -- [`server.telemost.vp8channel.yaml`](./examples/server.telemost.vp8channel.yaml) -- [`client.telemost.vp8channel.yaml`](./examples/client.telemost.vp8channel.yaml) -- [`server.wbstream.datachannel.yaml`](./examples/server.wbstream.datachannel.yaml) -- [`client.wbstream.datachannel.yaml`](./examples/client.wbstream.datachannel.yaml) -- [`server.wbstream.videochannel.yaml`](./examples/server.wbstream.videochannel.yaml) -- [`client.wbstream.videochannel.yaml`](./examples/client.wbstream.videochannel.yaml) -- [`server.wbstream.seichannel.yaml`](./examples/server.wbstream.seichannel.yaml) -- [`client.wbstream.seichannel.yaml`](./examples/client.wbstream.seichannel.yaml) -- [`server.wbstream.vp8channel.yaml`](./examples/server.wbstream.vp8channel.yaml) -- [`client.wbstream.vp8channel.yaml`](./examples/client.wbstream.vp8channel.yaml) -- [`failover.yaml`](./examples/failover.yaml) +- [`server.jitsi.datachannel.yaml`](./examples/server.jitsi.datachannel.yaml) - jitsi + datachannel srv +- [`client.jitsi.datachannel.yaml`](./examples/client.jitsi.datachannel.yaml) - jitsi + datachannel cnc +- [`server.jitsi.videochannel.yaml`](./examples/server.jitsi.videochannel.yaml) - jitsi + videochannel srv +- [`client.jitsi.videochannel.yaml`](./examples/client.jitsi.videochannel.yaml) - jitsi + videochannel cnc +- [`server.jitsi.seichannel.yaml`](./examples/server.jitsi.seichannel.yaml) - jitsi + seichannel srv +- [`client.jitsi.seichannel.yaml`](./examples/client.jitsi.seichannel.yaml) - jitsi + seichannel cnc +- [`server.jitsi.vp8channel.yaml`](./examples/server.jitsi.vp8channel.yaml) - jitsi + vp8channel srv +- [`client.jitsi.vp8channel.yaml`](./examples/client.jitsi.vp8channel.yaml) - jitsi + vp8channel cnc +- [`server.telemost.datachannel.yaml`](./examples/server.telemost.datachannel.yaml) - telemost + datachannel srv +- [`client.telemost.datachannel.yaml`](./examples/client.telemost.datachannel.yaml) - telemost + datachannel cnc +- [`server.telemost.videochannel.yaml`](./examples/server.telemost.videochannel.yaml) - telemost + videochannel srv +- [`client.telemost.videochannel.yaml`](./examples/client.telemost.videochannel.yaml) - telemost + videochannel cnc +- [`server.telemost.seichannel.yaml`](./examples/server.telemost.seichannel.yaml) - telemost + seichannel srv +- [`client.telemost.seichannel.yaml`](./examples/client.telemost.seichannel.yaml) - telemost + seichannel +- [`server.telemost.vp8channel.yaml`](./examples/server.telemost.vp8channel.yaml) - telemost + vp8channel srv +- [`client.telemost.vp8channel.yaml`](./examples/client.telemost.vp8channel.yaml) - telemost + vp8channel cnc +- [`server.wbstream.datachannel.yaml`](./examples/server.wbstream.datachannel.yaml) - wbstream + datachannel srv +- [`client.wbstream.datachannel.yaml`](./examples/client.wbstream.datachannel.yaml) - wbstream + datachannel cnc +- [`server.wbstream.videochannel.yaml`](./examples/server.wbstream.videochannel.yaml) - wbstream + videochannel srv +- [`client.wbstream.videochannel.yaml`](./examples/client.wbstream.videochannel.yaml) - wbstream + videochannel cnc +- [`server.wbstream.seichannel.yaml`](./examples/server.wbstream.seichannel.yaml) - wbstream + seichannel srv +- [`client.wbstream.seichannel.yaml`](./examples/client.wbstream.seichannel.yaml) - wbstream + seichannel cnc +- [`server.wbstream.vp8channel.yaml`](./examples/server.wbstream.vp8channel.yaml) - wbstream + vp8channel srv +- [`client.wbstream.vp8channel.yaml`](./examples/client.wbstream.vp8channel.yaml) - wbstream + vp8channel cnc +- [`failover.yaml`](./examples/failover.yaml) - failover ## Схема @@ -80,7 +90,7 @@ mode: srv auth: provider: jitsi room: - id: "https://meet.cryptopro.ru/myroom" + id: "https://meet.cryptopro.ru/REPLACE_ME_WITH_ROOM_ID" crypto: key: "REPLACE_ME_WITH_64_HEX_CHARS" net: @@ -96,7 +106,7 @@ mode: cnc auth: provider: jitsi room: - id: "https://meet.cryptopro.ru/myroom" + id: "https://meet.cryptopro.ru/REPLACE_ME_WITH_ROOM_ID" crypto: key: "REPLACE_ME_WITH_64_HEX_CHARS" net: diff --git a/docs/fast.md b/docs/fast.md index af30863..50ecf22 100644 --- a/docs/fast.md +++ b/docs/fast.md @@ -22,25 +22,25 @@ ### git ```sh -apt install git # Debian / Ubuntu / Mint -pacman -S git # Arch / CacheOS / Manjaro +apt install git # Debian / Ubuntu / Mint +pacman -S git # Arch / CacheOS / Manjaro dnf install git # Fedora / RHEL / CentOS ``` ### podman ```sh -apt install podman # Debian / Ubuntu / Mint -pacman -S podman # Arch / CacheOS / Manjaro -dnf install podman # Fedora / RHEL / CentOS +apt install podman # Debian / Ubuntu / Mint +pacman -S podman # Arch / CacheOS / Manjaro +dnf install podman # Fedora / RHEL / CentOS ``` ### curl ```sh -apt install curl # Debian / Ubuntu/ Mint -pacman -S curl # Arch / CacheOS / Manjaro -dnf install curl # Fedora +apt install curl # Debian / Ubuntu / Mint +pacman -S curl # Arch / CacheOS / Manjaro +dnf install curl # Fedora / RHEL / CentOS ``` ### swap (ОЗУ) @@ -74,8 +74,6 @@ cd olcrtc ./script/srv.sh ``` -Скрипт задаст несколько вопросов. - #### Флаги `srv.sh` | Флаг | Что делает | @@ -102,7 +100,7 @@ cd olcrtc Выбери сервис. Полную матрицу совместимости смотри в [settings.md](settings.md). -**По умолчанию `jitsi`** — стабильно работает на datachannel против self-hosted и публичных Jitsi инстансов (например `meet.cryptopro.ru`). +**По умолчанию `jitsi`** - стабильно работает на datachannel против self-hosted и публичных Jitsi инстансов (например `meet.cryptopro.ru`). ### Transport (как именно передавать данные) @@ -121,7 +119,7 @@ cd olcrtc - **seichannel** - работает только с wbstream, медленный, но мелкий пинг. - **videochannel** - работает с wbstream стабильно, с telemost по возможности; самый медленный и с большим пингом. -**Рекомендуемая комбинация: `jitsi + datachannel`** — работает стабильно, не требует регистрации, легко поднимать на своём сервере. Альтернатива: `wbstream + vp8channel`. +**Рекомендуемая комбинация: `jitsi + datachannel`** - работает стабильно, не требует регистрации, легко поднимать на своём сервере. Альтернатива: `wbstream + vp8channel`. ### Room ID @@ -131,7 +129,7 @@ cd olcrtc Для **jitsi** — полный URL комнаты в формате `https://host/room` (например `https://meet.cryptopro.ru/myroom`). Имя комнаты придумывается на лету, без регистрации. Подойдёт любой публичный или self-hosted Jitsi Meet. -Для **telemost** и **wbstream** - создай руму через сайт ([телемост](https://telemost.yandex.ru/), [wbstream](https://stream.wb.ru)) и вставь её ID. +Для **telemost** и **wbstream** - создай руму через сайт ([telemost](https://telemost.yandex.ru/), [wbstream](https://stream.wb.ru)) и вставь её ID. ### DNS diff --git a/docs/manual.md b/docs/manual.md index f47e1ac..fd2107e 100644 --- a/docs/manual.md +++ b/docs/manual.md @@ -12,10 +12,22 @@ Этот способ для тех кто хочет собрать бинарник руками без Docker/Podman. Нужен Go 1.25+, mage, git. -Проект в бете. По проблемам: t.me/openlibrecommunity +--- + + +### swap (ОЗУ) + +Если у вас меньше 4ГБ оперативной памяти, сборка может вылетать. **Обязательно включите SWAP**: + +```bash +sudo fallocate -l 4G /swapfile && sudo chmod 600 /swapfile && sudo mkswap /swapfile && sudo swapon /swapfile +``` + --- +## Что нужно установить + ## Шаг 1: Установить git ```sh @@ -31,7 +43,7 @@ dnf install git # Fedora / RHEL / CentOS ### Arch / Fedora (всё просто) ```sh -pacman -S go # Arch / CachyOS / Manjaro +pacman -S go # Arch / CachyOS / Manjaro dnf install go # Fedora / RHEL / CentOS ``` @@ -106,7 +118,6 @@ git clone https://github.com/openlibrecommunity/olcrtc --recurse-submodules cd olcrtc ``` -`--recurse-submodules` обязателен - без него videochannel не соберётся. --- @@ -121,9 +132,6 @@ mage cross # все платформы сразу (если собираешь ``` build/olcrtc-linux-amd64 -build/olcrtc-linux-arm64 -build/olcrtc-windows-amd64.exe -build/olcrtc-darwin-amd64 ``` --- @@ -313,12 +321,6 @@ curl --socks5-hostname 127.0.0.1:8808 https://icanhazip.com Должен вернуть IP сервера. -Или выставить переменную чтобы весь трафик шёл через прокси: - -```sh -export all_proxy=socks5h://127.0.0.1:8808 -curl https://icanhazip.com -``` --- diff --git a/docs/settings.md b/docs/settings.md index 24cd290..bf067fd 100644 --- a/docs/settings.md +++ b/docs/settings.md @@ -22,15 +22,15 @@ **Легенда:** - `+` - работает (pass в E2E тестах) - `-` - не работает / не поддерживается (fail в E2E тестах) -- `~` - нестабильно (может работать, но нестабильно) +- `~` - нестабильно (может работать) -**Telemost:** только vp8channel стабильно проходит. DataChannel удалён из Telemost. seichannel не поддерживается. videochannel — best effort. +**Telemost:** только vp8channel стабильно проходит. DataChannel удалён из Telemost. seichannel не поддерживается. videochannel - медленно. -**WBStream:** все транспорты кроме datachannel работают. DataChannel в обычном guest flow без выдавания модератора не работает — WB Stream выдаёт токены с `canPublishData=false`, и DC не маршрутизирует данные. +**WBStream:** все транспорты кроме datachannel работают. DataChannel в обычном guest flow без выдавания модератора не работает - WB Stream выдаёт токены с `canPublishData=false`, и DC не маршрутизирует данные. -**Jitsi:** datachannel стабильно проходит — реализован поверх colibri-ws bridge channel и шлёт байты через `EndpointMessage{raw}` broadcast. Подходит для self-hosted и публичных Jitsi Meet инстансов без аутентификации (`https://meet.cryptopro.ru/...`, `https://meet.jit.si/...` и т.п.). Видео-транспорты (vp8channel, seichannel, videochannel) экспонируют sendable VideoTrack через pion PeerConnection после Jingle session-accept, но Jicofo требует дополнительных протокольных шагов (LastN, ReceiverVideoConstraints, source-add) для маршрутизации видео — поэтому они помечены `~` (best effort). +**Jitsi:** datachannel стабильно проходит - реализован поверх colibri-ws bridge channel и шлёт байты через `EndpointMessage{raw}` broadcast. Подходит для self-hosted и публичных Jitsi Meet инстансов без аутентификации (`https://meet.cryptopro.ru/...`, `https://meet.jit.si/...` и т.п.). Видео-транспорты (vp8channel, seichannel, videochannel) экспонируют sendable VideoTrack через pion PeerConnection после Jingle session-accept, но Jicofo требует дополнительных протокольных шагов (LastN, ReceiverVideoConstraints, source-add) для маршрутизации видео - поэтому они помечены `~` . -**Jitsi + seichannel — отдельная оговорка.** SEI NAL-юниты идут пассажиром в H.264 видеопотоке, а Jicofo на self-hosted инстансах (например `meet.cryptopro.ru`) периодически режет/откладывает upstream видео когда ресивера в комнате формально нет — для нас это выглядит как `seichannel ack timeout` при формально живом PeerConnection. В steady-state транспорт работает, но e2e матрица помечает его `Unstable` (флаппит): зелёного и красного результата в CI достаточно, тест suite на этом не валится. Для надёжной передачи данных через jitsi предпочтительнее `datachannel` или `vp8channel`. +**Jitsi + seichannel — отдельная оговорка.** SEI NAL-юниты идут пассажиром в H.264 видеопотоке, а Jicofo на self-hosted инстансах (например `meet.cryptopro.ru`) периодически режет/откладывает upstream видео когда ресивера в комнате формально нет - для нас это выглядит как `seichannel ack timeout` при формально живом PeerConnection. В steady-state транспорт работает, но e2e матрица помечает его `Unstable` (флаппит): зелёного и красного результата в CI достаточно, тест suite на этом не валится. Для надёжной передачи данных через jitsi предпочтительнее `datachannel` или `vp8channel`. **Рекомендуемая комбинация: `jitsi + datachannel`** — стабильно работает на любом self-hosted или публичном Jitsi Meet (например `meet.cryptopro.ru`), не требует регистрации, простая руму создания. Альтернатива: `wbstream + vp8channel` — стабильно для коммерческих сценариев, не требует специальных прав. diff --git a/internal/crypto/chacha.go b/internal/crypto/chacha.go index 93a8425..744a64a 100644 --- a/internal/crypto/chacha.go +++ b/internal/crypto/chacha.go @@ -4,8 +4,10 @@ package crypto import ( "crypto/cipher" "crypto/rand" + "encoding/binary" "errors" "fmt" + "sync/atomic" "golang.org/x/crypto/chacha20poly1305" ) @@ -20,9 +22,30 @@ var ( ErrCiphertextTooShort = errors.New("ciphertext too short") ) -// Cipher provides AEAD encryption and decryption using ChaCha20-Poly1305. +// nonceSaltSize is the prefix of the XChaCha20 24-byte nonce that is +// chosen randomly once at Cipher construction. The remaining 8 bytes +// hold a monotonic counter incremented on every Encrypt call. With a +// fresh per-Cipher salt and a 64-bit counter, the (salt, counter) pair +// is unique for every encryption operation as long as the same Cipher +// instance is used (>10^19 messages before counter wrap). +const nonceSaltSize = chacha20poly1305.NonceSizeX - 8 + +// Cipher provides AEAD encryption and decryption using XChaCha20-Poly1305. +// +// Nonces are generated deterministically as `salt || counter` where the +// salt is a per-instance random 16-byte prefix and the counter is a +// monotonic 64-bit suffix. This avoids the syscall + global lock that +// crypto/rand.Read would impose on every encrypt call, which dominated +// the data-plane CPU profile under sustained throughput. +// +// The wire format is unchanged: ciphertexts are still [24-byte nonce] +// [encrypted payload][16-byte tag], so a peer using the previous +// random-nonce implementation can decrypt messages produced here, and +// vice versa. type Cipher struct { - aead cipher.AEAD + aead cipher.AEAD + salt [nonceSaltSize]byte + counter atomic.Uint64 } // NewCipher creates a new Cipher instance with the given 32-byte key. @@ -37,22 +60,43 @@ func NewCipher(keyStr string) (*Cipher, error) { return nil, fmt.Errorf("failed to create aead: %w", err) } - return &Cipher{aead: aead}, nil -} - -// Encrypt encrypts plaintext and prepends a random nonce. -func (c *Cipher) Encrypt(plaintext []byte) ([]byte, error) { - nonce := make([]byte, c.aead.NonceSize()) - if _, err := rand.Read(nonce); err != nil { - return nil, fmt.Errorf("failed to generate nonce: %w", err) + c := &Cipher{aead: aead} + if _, err := rand.Read(c.salt[:]); err != nil { + return nil, fmt.Errorf("failed to seed nonce salt: %w", err) } - // Seal appends the ciphertext to the nonce - return c.aead.Seal(nonce, nonce, plaintext, nil), nil + return c, nil +} + +// Encrypt encrypts plaintext and prepends a deterministic per-message +// nonce (random per-instance salt + monotonic counter). +// +// Allocates a single output buffer sized exactly for the resulting +// ciphertext, so AEAD.Seal does not have to grow the slice. +func (c *Cipher) Encrypt(plaintext []byte) ([]byte, error) { + nonceSize := c.aead.NonceSize() + overhead := c.aead.Overhead() + + // One alloc, sized for the full output: nonce || sealed(plaintext+tag). + out := make([]byte, nonceSize, nonceSize+len(plaintext)+overhead) + + copy(out[:nonceSaltSize], c.salt[:]) + binary.BigEndian.PutUint64(out[nonceSaltSize:nonceSize], c.counter.Add(1)) + + return c.aead.Seal(out, out[:nonceSize], plaintext, nil), nil } // Decrypt decrypts ciphertext that has a nonce prepended. func (c *Cipher) Decrypt(ciphertext []byte) ([]byte, error) { + return c.DecryptInto(nil, ciphertext) +} + +// DecryptInto appends the decrypted plaintext to dst (which can be nil) +// and returns the extended slice. Pass a buffer with enough spare +// capacity from a sync.Pool to avoid per-call allocations on the hot +// path: the AEAD primitive will write the plaintext in place when +// cap(dst) >= len(ciphertext) - WireOverhead. +func (c *Cipher) DecryptInto(dst, ciphertext []byte) ([]byte, error) { nonceSize := c.aead.NonceSize() if len(ciphertext) < nonceSize { return nil, ErrCiphertextTooShort @@ -61,7 +105,7 @@ func (c *Cipher) Decrypt(ciphertext []byte) ([]byte, error) { nonce := ciphertext[:nonceSize] encrypted := ciphertext[nonceSize:] - res, err := c.aead.Open(nil, nonce, encrypted, nil) + res, err := c.aead.Open(dst, nonce, encrypted, nil) if err != nil { return nil, fmt.Errorf("failed to decrypt: %w", err) } diff --git a/internal/crypto/chacha_test.go b/internal/crypto/chacha_test.go index a0ed9cc..84c151a 100644 --- a/internal/crypto/chacha_test.go +++ b/internal/crypto/chacha_test.go @@ -2,8 +2,11 @@ package crypto import ( "bytes" + "crypto/rand" "errors" "testing" + + "golang.org/x/crypto/chacha20poly1305" ) func TestNewCipherRejectsWrongKeySize(t *testing.T) { @@ -48,3 +51,117 @@ func TestDecryptRejectsShortCiphertext(t *testing.T) { t.Fatalf("Decrypt() error = %v, want %v", err, ErrCiphertextTooShort) } } + +// TestEncryptUniqueNonces ensures the deterministic-nonce optimisation +// never repeats a nonce within a single Cipher instance: the salt is +// fixed but the counter must move every call. +func TestEncryptUniqueNonces(t *testing.T) { + c, err := NewCipher("01234567890123456789012345678901") + if err != nil { + t.Fatalf("NewCipher() error = %v", err) + } + + const iterations = 1024 + nonceSize := chacha20poly1305.NonceSizeX + seen := make(map[string]struct{}, iterations) + for i := range iterations { + ct, err := c.Encrypt([]byte("payload")) + if err != nil { + t.Fatalf("Encrypt() error = %v", err) + } + nonce := string(ct[:nonceSize]) + if _, dup := seen[nonce]; dup { + t.Fatalf("nonce repeated at iteration %d", i) + } + seen[nonce] = struct{}{} + } +} + +// TestCipherInstancesDistinctSalts confirms two Cipher instances built +// from the same key still produce different nonce salts, so they cannot +// collide on (key, nonce) even at counter==1. +func TestCipherInstancesDistinctSalts(t *testing.T) { + const key = "01234567890123456789012345678901" + a, err := NewCipher(key) + if err != nil { + t.Fatalf("NewCipher(a) error = %v", err) + } + b, err := NewCipher(key) + if err != nil { + t.Fatalf("NewCipher(b) error = %v", err) + } + if bytes.Equal(a.salt[:], b.salt[:]) { + t.Fatal("two Cipher instances produced the same nonce salt") + } +} + +// TestDecryptAcceptsLegacyRandomNonce verifies the new Cipher can still +// decrypt ciphertexts produced by the previous fully-random-nonce +// implementation. This guarantees rolling upgrade safety: a peer running +// the old code can talk to one running the new code in either direction. +func TestDecryptAcceptsLegacyRandomNonce(t *testing.T) { + const key = "01234567890123456789012345678901" + c, err := NewCipher(key) + if err != nil { + t.Fatalf("NewCipher() error = %v", err) + } + + // Reproduce the legacy encryption path inline (random nonce, no salt + // or counter) using the same AEAD primitive. + aead, err := chacha20poly1305.NewX([]byte(key)) + if err != nil { + t.Fatalf("aead error = %v", err) + } + nonce := make([]byte, aead.NonceSize()) + if _, err := rand.Read(nonce); err != nil { + t.Fatalf("rand.Read() error = %v", err) + } + plaintext := []byte("legacy peer payload") + legacy := aead.Seal(nonce, nonce, plaintext, nil) + + got, err := c.Decrypt(legacy) + if err != nil { + t.Fatalf("Decrypt(legacy) error = %v", err) + } + if !bytes.Equal(got, plaintext) { + t.Fatalf("Decrypt(legacy) = %q, want %q", got, plaintext) + } +} + +// BenchmarkEncrypt covers the data-plane hot path: many encrypts of a +// typical smux frame size. Run with `go test -bench=Encrypt +// -benchmem ./internal/crypto` to compare against the previous +// implementation. +func BenchmarkEncrypt(b *testing.B) { + c, err := NewCipher("01234567890123456789012345678901") + if err != nil { + b.Fatalf("NewCipher() error = %v", err) + } + payload := bytes.Repeat([]byte{0xab}, 12*1024) + b.ResetTimer() + b.SetBytes(int64(len(payload))) + for range b.N { + if _, err := c.Encrypt(payload); err != nil { + b.Fatalf("Encrypt() error = %v", err) + } + } +} + +func BenchmarkDecrypt(b *testing.B) { + c, err := NewCipher("01234567890123456789012345678901") + if err != nil { + b.Fatalf("NewCipher() error = %v", err) + } + payload := bytes.Repeat([]byte{0xab}, 12*1024) + ct, err := c.Encrypt(payload) + if err != nil { + b.Fatalf("Encrypt() error = %v", err) + } + b.ResetTimer() + b.SetBytes(int64(len(payload))) + for range b.N { + if _, err := c.Decrypt(ct); err != nil { + b.Fatalf("Decrypt() error = %v", err) + } + } +} diff --git a/internal/e2e/local_soak_test.go b/internal/e2e/local_soak_test.go new file mode 100644 index 0000000..2f9fbcb --- /dev/null +++ b/internal/e2e/local_soak_test.go @@ -0,0 +1,380 @@ +package e2e + +import ( + "bufio" + "bytes" + "context" + "errors" + "flag" + "fmt" + "io" + "net" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/openlibrecommunity/olcrtc/internal/client" + "github.com/openlibrecommunity/olcrtc/internal/server" +) + +// Local throughput soak: pump as much traffic as the selected transport +// can sustain, locally, for an arbitrary duration. +// +// The tunnel is built on the in-memory carrier (no real provider, no +// network), so this measures the upper bound of what the +// SOCKS+muxconn+transport stack can do on this machine. Useful to: +// +// - leave running for hours and watch for goroutine / memory growth +// - reproduce slow-leak corruption with the byte-pattern verifier +// - get a feel for raw transport throughput before touching real WebRTC +// +// Quick start: +// +// go test -count=1 -v ./internal/e2e \ +// -run '^TestLocalThroughputSoak$' \ +// -olcrtc.local-soak \ +// -olcrtc.local-soak-duration=12h \ +// -timeout=13h +// +// The test is gated by -olcrtc.local-soak so it never runs in regular CI. + +var ( + localSoakEnabled = flag.Bool( //nolint:gochecknoglobals // package-level state intentional + "olcrtc.local-soak", + false, + "run TestLocalThroughputSoak (long-running local throughput pump)", + ) + localSoakDuration = flag.Duration( //nolint:gochecknoglobals // package-level state intentional + "olcrtc.local-soak-duration", + 30*time.Second, + "how long to keep pumping traffic (e.g. 12h, 30m, 90s)", + ) + localSoakTransport = flag.String( //nolint:gochecknoglobals // package-level state intentional + "olcrtc.local-soak-transport", + transportData, + "transport to pump through: datachannel|videochannel|seichannel|vp8channel", + ) + localSoakChunk = flag.Int( //nolint:gochecknoglobals // package-level state intentional + "olcrtc.local-soak-chunk", + 64*1024, + "write/read chunk size in bytes", + ) + localSoakProgress = flag.Duration( //nolint:gochecknoglobals // package-level state intentional + "olcrtc.local-soak-progress", + 30*time.Second, + "how often to log throughput progress lines", + ) + localSoakVerify = flag.Bool( //nolint:gochecknoglobals // package-level state intentional + "olcrtc.local-soak-verify", + true, + "verify echoed bytes match the sent pattern (slower, but catches corruption)", + ) +) + +var errLocalSoakPayloadMismatch = errors.New("local soak payload mismatch") + +// TestLocalThroughputSoak pumps a deterministic byte pattern through a +// locally-built tunnel for -olcrtc.local-soak-duration and reports +// throughput periodically. Both writer and reader run concurrently on the +// same SOCKS connection; with the loopback echo server on the far end +// each byte gets written, tunneled across, echoed back, and verified. +func TestLocalThroughputSoak(t *testing.T) { + if !*localSoakEnabled { + t.Skip("local soak disabled; pass -olcrtc.local-soak to enable") + } + if *localSoakDuration <= 0 { + t.Skip("local soak duration is zero") + } + if *localSoakChunk <= 0 { + t.Fatalf("invalid -olcrtc.local-soak-chunk=%d", *localSoakChunk) + } + + // Connection setup itself can be slow (first WebRTC handshake on + // some transports), so don't fold it into the duration budget. + const setupBudget = 30 * time.Second + + t.Logf("[soak] transport=%s duration=%s chunk=%d verify=%t progress=%s", + *localSoakTransport, *localSoakDuration, *localSoakChunk, + *localSoakVerify, *localSoakProgress) + + rt := startLocalSoakTunnel(t, *localSoakTransport) + echoAddr := startEchoServer(t) + + conn, err := connectViaSOCKSWithin(rt.socksAddr, echoAddr, setupBudget) + if err != nil { + t.Fatalf("connect via SOCKS: %v", err) + } + defer func() { _ = conn.Close() }() + + pumpCtx, cancelPump := context.WithTimeout(context.Background(), *localSoakDuration) + defer cancelPump() + + stats := runLocalSoakPump(pumpCtx, t, conn, *localSoakChunk, *localSoakVerify, *localSoakProgress) + + if stats.sent == 0 || stats.recv == 0 { + t.Fatalf("no traffic moved: sent=%d recv=%d", stats.sent, stats.recv) + } + if stats.err != nil && !isExpectedShutdownErr(stats.err) { + t.Fatalf("pump error: %v", stats.err) + } + + t.Logf("[soak] DONE transport=%s elapsed=%s sent=%s recv=%s send=%s/s recv=%s/s", + *localSoakTransport, + stats.elapsed.Round(time.Second), + humanBytes(stats.sent), + humanBytes(stats.recv), + humanBytes(int64(float64(stats.sent)/stats.elapsed.Seconds())), + humanBytes(int64(float64(stats.recv)/stats.elapsed.Seconds())), + ) +} + +// startLocalSoakTunnel mirrors startTunnel but lets the caller pick the +// transport (the original is hard-coded to datachannel). +func startLocalSoakTunnel(t *testing.T, transportName string) *tunnelRuntime { + t.Helper() + + carrierName, room := registerMemoryCarrier(t) + ctx, cancel := context.WithCancel(context.Background()) + t.Cleanup(cancel) + socksAddr := freeLocalAddr(ctx, t) + options := e2eTransportOptions(transportName) + + serverErr := make(chan error, 1) + go func() { + serverErr <- server.Run(ctx, server.Config{ + Transport: transportName, + TransportOptions: options, + Carrier: carrierName, + RoomURL: testRoom, + KeyHex: testKeyHex, + DNSServer: localDNSServer, + }) + }() + room.waitConnected(t, 1) + + ready := make(chan struct{}) + clientErr := make(chan error, 1) + go func() { + clientErr <- client.RunWithReady(ctx, client.Config{ + Transport: transportName, + TransportOptions: options, + Carrier: carrierName, + RoomURL: testRoom, + KeyHex: testKeyHex, + DeviceID: testClientDeviceID, + LocalAddr: socksAddr, + DNSServer: localDNSServer, + }, func() { close(ready) }) + }() + waitForReady(t, ready) + + return &tunnelRuntime{ + socksAddr: socksAddr, + room: room, + cancel: cancel, + serverErr: serverErr, + clientErr: clientErr, + stopWait: 3 * time.Second, + } +} + +type localSoakStats struct { + sent, recv int64 + elapsed time.Duration + err error +} + +// runLocalSoakPump runs a writer goroutine and a reader goroutine over the +// same conn until ctx expires, periodically logging progress. Bytes are +// counted atomically so the progress logger sees a coherent snapshot. +func runLocalSoakPump( + ctx context.Context, + t *testing.T, + conn net.Conn, + chunkSize int, + verify bool, + progressEvery time.Duration, +) localSoakStats { + t.Helper() + + var sent, recv atomic.Int64 + start := time.Now() + + progressDone := make(chan struct{}) + go runLocalSoakProgress(ctx, t, &sent, &recv, start, progressEvery, progressDone) + + var ( + wg sync.WaitGroup + errOnce sync.Once + pumpErr error + ) + recordErr := func(err error) { + if err == nil { + return + } + errOnce.Do(func() { pumpErr = err }) + } + + wg.Add(2) + go pumpWriter(ctx, conn, chunkSize, &sent, &wg, recordErr) + go pumpReader(ctx, conn, chunkSize, verify, &recv, &wg, recordErr) + + <-ctx.Done() + // Force-close the conn so both pumps unblock from any in-flight I/O. + // SetDeadline-in-the-past is the canonical kick. + _ = conn.SetDeadline(time.Unix(1, 0)) + wg.Wait() + <-progressDone + + return localSoakStats{ + sent: sent.Load(), + recv: recv.Load(), + elapsed: time.Since(start), + err: pumpErr, + } +} + +// runLocalSoakProgress emits periodic throughput lines until ctx fires. +func runLocalSoakProgress( + ctx context.Context, + t *testing.T, + sent, recv *atomic.Int64, + start time.Time, + progressEvery time.Duration, + done chan<- struct{}, +) { + t.Helper() + defer close(done) + if progressEvery <= 0 { + return + } + ticker := time.NewTicker(progressEvery) + defer ticker.Stop() + var lastSent, lastRecv int64 + lastTime := start + for { + select { + case <-ctx.Done(): + return + case now := <-ticker.C: + s, r := sent.Load(), recv.Load() + dt := now.Sub(lastTime).Seconds() + instSendRate := int64(float64(s-lastSent) / dt) + instRecvRate := int64(float64(r-lastRecv) / dt) + t.Logf("[soak] elapsed=%s sent=%s recv=%s tx=%s/s rx=%s/s", + now.Sub(start).Round(time.Second), + humanBytes(s), humanBytes(r), + humanBytes(instSendRate), humanBytes(instRecvRate), + ) + lastSent, lastRecv = s, r + lastTime = now + } + } +} + +// pumpWriter pushes a deterministic byte pattern through conn until ctx +// expires or the connection errors out. +func pumpWriter( + ctx context.Context, + conn net.Conn, + chunkSize int, + sent *atomic.Int64, + wg *sync.WaitGroup, + recordErr func(error), +) { + defer wg.Done() + buf := make([]byte, chunkSize) + var off int64 + for ctx.Err() == nil { + fillPattern(buf, off) + if _, err := conn.Write(buf); err != nil { + if ctx.Err() == nil { + recordErr(fmt.Errorf("write at %d: %w", off, err)) + } + return + } + off += int64(chunkSize) + sent.Add(int64(chunkSize)) + } +} + +// pumpReader reads echoed bytes back, optionally verifying them against +// the deterministic pattern that pumpWriter produced at the same offset. +func pumpReader( + ctx context.Context, + conn net.Conn, + chunkSize int, + verify bool, + recv *atomic.Int64, + wg *sync.WaitGroup, + recordErr func(error), +) { + defer wg.Done() + rdr := bufio.NewReader(conn) + echoed := make([]byte, chunkSize) + want := make([]byte, chunkSize) + var off int64 + for ctx.Err() == nil { + if _, err := io.ReadFull(rdr, echoed); err != nil { + if ctx.Err() == nil { + recordErr(fmt.Errorf("read at %d: %w", off, err)) + } + return + } + if verify { + fillPattern(want, off) + if !bytes.Equal(echoed, want) { + recordErr(fmt.Errorf("%w at offset %d", errLocalSoakPayloadMismatch, off)) + return + } + } + off += int64(chunkSize) + recv.Add(int64(chunkSize)) + } +} + +// isExpectedShutdownErr filters errors that just mean "we asked the conn +// to stop" — deadline expirations from our SetDeadline kick, EOF from the +// peer half-closing, etc. +func isExpectedShutdownErr(err error) bool { + if err == nil { + return true + } + if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) { + return true + } + if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { + return true + } + var nerr net.Error + if errors.As(err, &nerr) && nerr.Timeout() { + return true + } + if errors.Is(err, net.ErrClosed) { + return true + } + return false +} + +// humanBytes formats a byte count with a binary-unit suffix. +func humanBytes(n int64) string { + const ( + kib = 1 << 10 + mib = 1 << 20 + gib = 1 << 30 + tib = 1 << 40 + ) + switch { + case n >= tib: + return fmt.Sprintf("%.2f TiB", float64(n)/float64(tib)) + case n >= gib: + return fmt.Sprintf("%.2f GiB", float64(n)/float64(gib)) + case n >= mib: + return fmt.Sprintf("%.2f MiB", float64(n)/float64(mib)) + case n >= kib: + return fmt.Sprintf("%.2f KiB", float64(n)/float64(kib)) + default: + return fmt.Sprintf("%d B", n) + } +} diff --git a/internal/muxconn/conn.go b/internal/muxconn/conn.go index f2d3856..9fdb4cc 100644 --- a/internal/muxconn/conn.go +++ b/internal/muxconn/conn.go @@ -7,8 +7,9 @@ // on the peer. smux operates on a pure byte stream (header + payload may be // glued or split across reads). We bridge by: // -// - Treating each Push as an opaque chunk appended to an internal byte -// buffer that Read drains in arbitrary slices. +// - Treating each Push as an opaque chunk handed off via a channel that +// Read drains in arbitrary slices, retaining any tail bytes that did +// not fit the caller's buffer for the next Read. // - Letting smux's sendLoop call Write once per frame; we encrypt and hand // the whole buffer to the link as a single message. Length boundaries // are preserved end-to-end by the transport (KCP length-prefix framing @@ -21,6 +22,7 @@ import ( "io" "runtime" "sync" + "sync/atomic" "time" "github.com/openlibrecommunity/olcrtc/internal/crypto" @@ -31,83 +33,211 @@ import ( // ErrClosed is returned from Read/Write after the conn has been closed. var ErrClosed = errors.New("muxconn: closed") +const ( + // inboundQueue is the buffered capacity of the Push -> Read pipeline. + // It absorbs short Read stalls without applying back-pressure to the + // transport callback. Frames are typically smux-sized (well under + // 16 KiB), so 256 amounts to a few MiB of in-flight data, which is + // enough for sustained throughput on every transport we have without + // unbounded growth on a stuck reader. + inboundQueue = 256 + + // pooledFrameCap is the capacity each pooled plaintext buffer is born + // with. It is sized to fit the largest smux frame any of our + // transports will deliver after AEAD overhead is stripped (datachannel + // caps at 12 KiB on the wire, vp8channel at 60 KiB; we round up to + // give Open room to write in place without growing the slice). + pooledFrameCap = 64 * 1024 +) + +// frameBufPool recycles plaintext buffers between Push (decrypts a wire +// frame into a buffer) and Read (consumes the buffer fully then returns +// it). It is global so all Conn instances share the same hot cache — +// most clients in the same process talk to a handful of peers, and +// per-Conn pools fragment the warm set unnecessarily. +var frameBufPool = sync.Pool{ //nolint:gochecknoglobals // intentional process-wide buffer pool + New: func() any { + b := make([]byte, 0, pooledFrameCap) + return &b + }, +} + +func acquireFrameBuf() *[]byte { + bp := frameBufPool.Get().(*[]byte) //nolint:forcetypeassert // pool only ever holds *[]byte + *bp = (*bp)[:0] + return bp +} + +func releaseFrameBuf(bp *[]byte) { + if bp == nil { + return + } + // Drop oversized buffers so a one-off huge frame can't poison the + // pool's working set forever. + if cap(*bp) > pooledFrameCap*2 { + return + } + *bp = (*bp)[:0] + frameBufPool.Put(bp) +} + // Conn is an io.ReadWriteCloser over a [transport.Transport] with optional AEAD wrapping. +// +// Push produces decrypted plaintext frames into an internal channel; Read +// drains the channel and slices each frame across as many caller buffers +// as needed. The hot path is lock-free: a single producer (the transport +// callback) and a single consumer (smux's read loop) communicate via a +// buffered channel without any cond/mutex ping-pong. +// +// Plaintext buffers are recycled through frameBufPool: Push borrows a +// buffer to decrypt into, ships it through the channel, and Read returns +// the buffer to the pool once its caller has consumed all the bytes. type Conn struct { ln transport.Transport send func([]byte) error cipher *crypto.Cipher - mu sync.Mutex - cond *sync.Cond - buf []byte - closed bool + in chan *[]byte + closeOnce sync.Once + closeCh chan struct{} + closed atomic.Bool + + // leftoverBuf holds the pool buffer whose tail is still in + // `leftover`. When `leftover` empties we return leftoverBuf to the + // pool and clear both fields. Touched only by Read. + leftoverBuf *[]byte + leftover []byte } // New wires a Conn over the given transport. Push must be set as the // transport's OnData callback before this conn is used. func New(ln transport.Transport, cipher *crypto.Cipher) *Conn { - c := &Conn{ln: ln, send: ln.Send, cipher: cipher} - c.cond = sync.NewCond(&c.mu) - return c + return &Conn{ + ln: ln, + send: ln.Send, + cipher: cipher, + in: make(chan *[]byte, inboundQueue), + closeCh: make(chan struct{}), + } } // NewPeer wires a Conn whose writes are addressed to a specific transport peer. func NewPeer(ln transport.PeerTransport, cipher *crypto.Cipher, peerID string) *Conn { - c := &Conn{ + return &Conn{ ln: ln, send: func(data []byte) error { return ln.SendTo(peerID, data) }, - cipher: cipher, + cipher: cipher, + in: make(chan *[]byte, inboundQueue), + closeCh: make(chan struct{}), } - c.cond = sync.NewCond(&c.mu) - return c -} - -// Reset clears any buffered inbound bytes, re-arms a closed conn for writes, -// and unblocks pending Reads so the smux session on top of it exits cleanly. -// Use it when the link stays up but the peer's smux session has been rebuilt: -// the inbound byte stream (now indistinguishable random-looking data) must be -// parsed by the fresh smux state, not the old one. -func (c *Conn) Reset() { - c.mu.Lock() - c.buf = nil - c.closed = false - c.cond.Broadcast() - c.mu.Unlock() } // Push hands an encrypted wire payload (one OnData event) to the conn. +// +// On the producer side: borrow a pooled plaintext buffer, decrypt into +// it, then either deliver via the inbound channel or, if the caller has +// Close'd, return the buffer to the pool. Blocking forever on a wedged +// reader would wedge the transport callback and trip its watchdog, so we +// also bail on closeCh. func (c *Conn) Push(ciphertext []byte) { - pt, err := c.cipher.Decrypt(ciphertext) + bufPtr := acquireFrameBuf() + pt, err := c.cipher.DecryptInto(*bufPtr, ciphertext) if err != nil { + releaseFrameBuf(bufPtr) logger.Debugf("muxconn: decrypt failed, dropping frame: %v", err) return } - c.mu.Lock() - defer c.mu.Unlock() - if c.closed { + *bufPtr = pt + if c.closed.Load() { + releaseFrameBuf(bufPtr) return } - c.buf = append(c.buf, pt...) - c.cond.Broadcast() + select { + case c.in <- bufPtr: + case <-c.closeCh: + releaseFrameBuf(bufPtr) + } } -// Read implements io.Reader. Blocks until at least one byte is available. +// Read implements io.Reader. Blocks until at least one byte is available; +// after that, drains additional ready frames non-blockingly to fill p, so +// a single Read can absorb several queued frames in one go. This matches +// the prior cond/append-based implementation's concatenation behaviour +// and lets smux's bufio reader pull large chunks at a time. func (c *Conn) Read(p []byte) (int, error) { - c.mu.Lock() - defer c.mu.Unlock() - for !c.closed && len(c.buf) == 0 { - c.cond.Wait() + if len(p) == 0 { + return 0, nil } - if len(c.buf) == 0 { - return 0, io.EOF + if len(c.leftover) == 0 { + bufPtr, ok := c.takeFrame() + if !ok { + return 0, io.EOF + } + c.leftoverBuf = bufPtr + c.leftover = *bufPtr + } + n := copy(p, c.leftover) + c.leftover = c.leftover[n:] + c.recycleIfDrained() + + // Greedily pull additional frames already sitting in the queue, + // without blocking. This keeps the channel from accumulating a + // backlog when the consumer asks for a large buffer. + for n < len(p) && len(c.leftover) == 0 { + select { + case bufPtr, ok := <-c.in: + if !ok { + return n, nil + } + data := *bufPtr + m := copy(p[n:], data) + n += m + if m < len(data) { + c.leftoverBuf = bufPtr + c.leftover = data[m:] + } else { + releaseFrameBuf(bufPtr) + } + default: + return n, nil + } } - n := copy(p, c.buf) - c.buf = c.buf[n:] return n, nil } +// takeFrame blocks until a frame is available or the conn is closed. +// On a clean close it still drains any frame that landed before the +// close signal won the race, so a peer that shuts us down right after a +// final write doesn't lose data. +func (c *Conn) takeFrame() (*[]byte, bool) { + select { + case bufPtr, ok := <-c.in: + if !ok { + return nil, false + } + return bufPtr, true + case <-c.closeCh: + select { + case bufPtr, ok := <-c.in: + if !ok { + return nil, false + } + return bufPtr, true + default: + return nil, false + } + } +} + +func (c *Conn) recycleIfDrained() { + if len(c.leftover) == 0 && c.leftoverBuf != nil { + releaseFrameBuf(c.leftoverBuf) + c.leftoverBuf = nil + } +} + // Write encrypts p and ships it to the link as a single message. Blocks while // the link signals back-pressure. func (c *Conn) Write(p []byte) (int, error) { @@ -120,7 +250,7 @@ func (c *Conn) Write(p []byte) (int, error) { slowPollDelay = 2 * time.Millisecond ) for attempt := 0; ; attempt++ { - if c.isClosed() { + if c.closed.Load() { return 0, ErrClosed } if c.ln.CanSend() { @@ -145,18 +275,9 @@ func (c *Conn) Write(p []byte) (int, error) { // Close unblocks any pending Read with io.EOF. func (c *Conn) Close() error { - c.mu.Lock() - defer c.mu.Unlock() - if c.closed { - return nil - } - c.closed = true - c.cond.Broadcast() + c.closeOnce.Do(func() { + c.closed.Store(true) + close(c.closeCh) + }) return nil } - -func (c *Conn) isClosed() bool { - c.mu.Lock() - defer c.mu.Unlock() - return c.closed -} diff --git a/mobile/mobile_test.go b/mobile/mobile_test.go index a540d87..1a2f484 100644 --- a/mobile/mobile_test.go +++ b/mobile/mobile_test.go @@ -53,7 +53,7 @@ func resetMobileGlobals(t *testing.T) { var clientRunWithReady = runClientWithReady //nolint:gochecknoglobals // package-level state intentional -const testRoom = "room" +const testRoomID = "room" var ( errMobileCheckFailed = errors.New("check failed") @@ -130,7 +130,7 @@ func TestNormalizeBuildRoomAndClamp(t *testing.T) { if got := buildRoomURL("telemost", "abc"); got != "abc" { t.Fatalf("telemost room URL = %q", got) } - if got := buildRoomURL(carrierWBStream, testRoom); got != testRoom { + if got := buildRoomURL(carrierWBStream, testRoomID); got != testRoomID { t.Fatalf("wbstream room URL = %q", got) } @@ -142,23 +142,23 @@ func TestNormalizeBuildRoomAndClamp(t *testing.T) { func TestStartValidation(t *testing.T) { resetMobileGlobals(t) - if err := startWithConfig("", dataTransport, testRoom, "client", "key", 1080, "", "", mobileConfig{}); !errors.Is(err, errCarrierRequired) { //nolint:lll // long test description + if err := startWithConfig("", dataTransport, testRoomID, "client", "key", 1080, "", "", mobileConfig{}); !errors.Is(err, errCarrierRequired) { //nolint:lll // long test description t.Fatalf("startWithConfig(missing carrier) = %v", err) } if err := startWithConfig("telemost", dataTransport, "", "client", "key", 1080, "", "", mobileConfig{}); !errors.Is(err, errRoomIDRequired) { //nolint:lll // long test description t.Fatalf("startWithConfig(missing room) = %v", err) } - if err := startWithConfig("jitsi", dataTransport, testRoom, "", "key", 1080, "", "", mobileConfig{}); !errors.Is(err, errClientIDRequired) { //nolint:lll // long test description + if err := startWithConfig("jitsi", dataTransport, testRoomID, "", "key", 1080, "", "", mobileConfig{}); !errors.Is(err, errClientIDRequired) { //nolint:lll // long test description t.Fatalf("startWithConfig(missing client) = %v", err) } - if err := startWithConfig("jitsi", dataTransport, testRoom, "client", "", 1080, "", "", mobileConfig{}); !errors.Is(err, errKeyHexRequired) { //nolint:lll // long test description + if err := startWithConfig("jitsi", dataTransport, testRoomID, "client", "", 1080, "", "", mobileConfig{}); !errors.Is(err, errKeyHexRequired) { //nolint:lll // long test description t.Fatalf("startWithConfig(missing key) = %v", err) } mu.Lock() cancel = func() {} mu.Unlock() - if err := startWithConfig("jitsi", dataTransport, testRoom, "client", "key", 1080, "", "", mobileConfig{}); !errors.Is(err, errAlreadyRunning) { //nolint:lll // long test description + if err := startWithConfig("jitsi", dataTransport, testRoomID, "client", "key", 1080, "", "", mobileConfig{}); !errors.Is(err, errAlreadyRunning) { //nolint:lll // long test description t.Fatalf("startWithConfig(running) = %v", err) } resetMobileGlobals(t) @@ -176,7 +176,7 @@ func TestStartWithInjectedRunnerLifecycle(t *testing.T) { runClientWithReady = func(ctx context.Context, cfg client.Config, onReady func()) error { opts, _ := cfg.TransportOptions.(vp8channel.Options) if cfg.Transport != dataTransport || cfg.Carrier != "jitsi" || - cfg.RoomURL != testRoom || cfg.DeviceID != "client" || cfg.LocalAddr != "0.0.0.0:1080" || + cfg.RoomURL != testRoomID || cfg.DeviceID != "client" || cfg.LocalAddr != "0.0.0.0:1080" || cfg.DNSServer != defaultDNSServer || opts.FPS != 60 || opts.BatchSize != 8 || cfg.Liveness.Interval != 2500*time.Millisecond || cfg.Liveness.Timeout != 750*time.Millisecond || @@ -193,7 +193,7 @@ func TestStartWithInjectedRunnerLifecycle(t *testing.T) { return ctx.Err() } - if err := StartWithTransport("jitsi", "dc", testRoom, "client", "key", 1080, "", ""); err != nil { + if err := StartWithTransport("jitsi", "dc", testRoomID, "client", "key", 1080, "", ""); err != nil { t.Fatalf("StartWithTransport() error = %v", err) } if !IsRunning() { @@ -216,7 +216,7 @@ func TestStartUsesDefaultsAndCheckWithInjectedRunner(t *testing.T) { }) runClientWithReady = func(ctx context.Context, cfg client.Config, onReady func()) error { - if cfg.Transport != defaultTransport || cfg.RoomURL != testRoom || + if cfg.Transport != defaultTransport || cfg.RoomURL != testRoomID || cfg.LocalAddr != "127.0.0.1:1081" || cfg.SOCKSUser != "u" || cfg.SOCKSPass != "p" || cfg.Liveness.Interval != control.DefaultInterval || cfg.Liveness.Timeout != control.DefaultTimeout || @@ -229,7 +229,7 @@ func TestStartUsesDefaultsAndCheckWithInjectedRunner(t *testing.T) { return ctx.Err() } - if err := Start("telemost", testRoom, "client", "key", 1081, "u", "p"); err != nil { + if err := Start("telemost", testRoomID, "client", "key", 1081, "u", "p"); err != nil { t.Fatalf("Start() error = %v", err) } if err := WaitReady(100); err != nil { @@ -251,7 +251,7 @@ func TestStartUsesDefaultsAndCheckWithInjectedRunner(t *testing.T) { <-ctx.Done() return nil } - elapsed, err := Check("jitsi", "dc", testRoom, "client", "key", 1082, 100, -1, 999) + elapsed, err := Check("jitsi", "dc", testRoomID, "client", "key", 1082, 100, -1, 999) if err != nil { t.Fatalf("Check() error = %v", err) } @@ -275,7 +275,7 @@ func TestPingPassesLiveness(t *testing.T) { return nil } - _, _ = Ping("jitsi", "dc", testRoom, "client", "key", 1085, 100, "http://127.0.0.1/", 30, 1) + _, _ = Ping("jitsi", "dc", testRoomID, "client", "key", 1085, 100, "http://127.0.0.1/", 30, 1) select { case got := <-seen: if got.Interval != 4000*time.Millisecond || got.Timeout != 1500*time.Millisecond || got.Failures != 6 { @@ -296,7 +296,7 @@ func TestCheckTimeoutAndRunError(t *testing.T) { <-ctx.Done() return nil } - if _, err := Check("telemost", defaultTransport, testRoom, "client", "key", 1083, 1, 30, 1); !errors.Is(err, errStartTimedOut) { //nolint:lll // long test description + if _, err := Check("telemost", defaultTransport, testRoomID, "client", "key", 1083, 1, 30, 1); !errors.Is(err, errStartTimedOut) { //nolint:lll // long test description t.Fatalf("Check(timeout) error = %v, want %v", err, errStartTimedOut) } @@ -304,7 +304,9 @@ func TestCheckTimeoutAndRunError(t *testing.T) { runClientWithReady = func(context.Context, client.Config, func()) error { return want } - if _, err := Check("telemost", defaultTransport, testRoom, "client", "key", 1084, 100, 30, 1); !errors.Is(err, want) { + if _, err := Check( + "telemost", defaultTransport, testRoomID, "client", "key", 1084, 100, 30, 1, + ); !errors.Is(err, want) { t.Fatalf("Check(run error) = %v, want %v", err, want) } } diff --git a/readme.md b/readme.md index 83a8dcd..d2a9945 100644 --- a/readme.md +++ b/readme.md @@ -42,38 +42,6 @@ Community ui client: [alananisimov/olcbox](https://github.com/alananisimov/olcbo [Client subscription format](docs/sub.md) -## Build - -```bash -# install mage first -go install github.com/magefile/mage@latest - -# build cli + ui -mage build - -# build cli only -mage buildCLI - -# build cli with b codec, clones b repo, builds libb.so, compiles with -tags b -mage buildCLIB - -# cross-compile for linux / windows / darwin -mage cross - -# android aar via gomobile -mage mobile - -# container image -mage podman -mage docker - -# lint / test / clean -mage lint -mage test -mage clean - -``` -
---