From 0d9de3588debb04eb9eadfc20517638e9cae6a74 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 11 May 2026 13:49:19 +0300 Subject: [PATCH 1/4] =?UTF-8?q?feat:=20add=20pkg/olcrtc=20=E2=80=94=20publ?= =?UTF-8?q?ic=20library=20API?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Exposes olcrtc as an embeddable Go library via pkg/olcrtc.Session. Two usage modes: - Direct engine: caller supplies Engine+URL+Token, no HTTP auth flow. - Built-in auth: caller supplies Auth+RoomID; the registered auth provider (telemost, jazz, wbstream) resolves credentials internally. Public surface: New(ctx, Config) (*Session, error) Session.Connect / Send / Close / WatchConnection Session.CanSend / SetEndedCallback / SetShouldReconnect RegisterDefaults() — pulls in all built-in engines + auth providers. Also add !pkg/ exception to .gitignore (bare "olcrtc" pattern was shadowing the new directory). Co-Authored-By: Claude Opus 4.7 --- .gitignore | 2 +- pkg/olcrtc/olcrtc.go | 219 ++++++++++++++++++++++++++++++++++++++ pkg/olcrtc/olcrtc_test.go | 160 ++++++++++++++++++++++++++++ 3 files changed, 380 insertions(+), 1 deletion(-) create mode 100644 pkg/olcrtc/olcrtc.go create mode 100644 pkg/olcrtc/olcrtc_test.go diff --git a/.gitignore b/.gitignore index 1e74a6c..d0b6a3c 100644 --- a/.gitignore +++ b/.gitignore @@ -246,6 +246,6 @@ go.work.sum build/ GEMINI.md code/package-lock.json -olcrtc !cmd/olcrtc/ !cmd/olcrtc/main_test.go +!pkg/ diff --git a/pkg/olcrtc/olcrtc.go b/pkg/olcrtc/olcrtc.go new file mode 100644 index 0000000..20099f1 --- /dev/null +++ b/pkg/olcrtc/olcrtc.go @@ -0,0 +1,219 @@ +// Package olcrtc exposes olcrtc as an embeddable Go library. +// +// Typical usage (direct engine, no service-specific auth): +// +// sess, err := olcrtc.New(ctx, olcrtc.Config{ +// Engine: "livekit", +// URL: "wss://sfu.example/", +// Token: "", +// }) +// +// Typical usage (built-in auth provider): +// +// sess, err := olcrtc.New(ctx, olcrtc.Config{ +// Auth: "telemost", +// RoomID: "", +// }) +// +// In both cases the caller must import the engine and (optionally) auth +// packages it needs via blank imports so their init() functions run: +// +// import ( +// _ "github.com/openlibrecommunity/olcrtc/internal/engine/livekit" +// _ "github.com/openlibrecommunity/olcrtc/internal/auth/telemost" +// ) +// +// Or use [RegisterDefaults] to pull in all built-in implementations at once. +package olcrtc + +import ( + "context" + "errors" + "fmt" + + "github.com/openlibrecommunity/olcrtc/internal/auth" + "github.com/openlibrecommunity/olcrtc/internal/carrier/builtin" + "github.com/openlibrecommunity/olcrtc/internal/engine" +) + +var ( + // ErrAuthOrEngineRequired is returned when neither auth nor engine+URL are supplied. + ErrAuthOrEngineRequired = errors.New("olcrtc: supply either Auth or Engine+URL") + // ErrURLRequired is returned when direct mode is used without a URL. + ErrURLRequired = errors.New("olcrtc: URL required when using direct engine mode") + // ErrTokenRequired is returned when direct mode is used without a token. + ErrTokenRequired = errors.New("olcrtc: Token required when using direct engine mode") +) + +// Config is the input to [New]. +type Config struct { + // --- built-in auth mode --- + // Auth is the name of a registered auth provider ("telemost", "jazz", "wbstream"). + // When set, RoomID is forwarded to the provider as the room reference. + Auth string + RoomID string + + // --- direct engine mode (Auth == "") --- + // Engine selects the SFU protocol ("livekit", "goolom", "salutejazz"). + // Defaults to "livekit" when Auth is empty. + Engine string + URL string + Token string + + // --- common --- + // Name is the display name used when joining the room. + Name string + // DNSServer is an optional custom DNS resolver (e.g. "1.1.1.1:53"). + DNSServer string + // ProxyAddr / ProxyPort configure an outbound SOCKS5 proxy. + ProxyAddr string + ProxyPort int + // OnData, when set, receives incoming data-channel bytes. If nil the + // session operates in video-track / media-only mode. + OnData func([]byte) +} + +// Session is the library handle returned by [New]. +// Connect must be called before Send. Close releases all resources. +type Session struct { + inner engine.Session + // refresh is stored so it survives reconnects via the engine's Refresh hook. + authProvider auth.Provider + authCfg auth.Config +} + +// RegisterDefaults registers all built-in engines and auth providers. +// Call once at program start if you want the full set without manual blank +// imports. Safe to call multiple times. +func RegisterDefaults() { + builtin.Register() +} + +// New creates a Session from cfg. The session is not connected yet; call +// [Session.Connect] when ready. +func New(ctx context.Context, cfg Config) (*Session, error) { + if cfg.Auth != "" { + return newWithAuth(ctx, cfg) + } + return newDirect(ctx, cfg) +} + +func newWithAuth(ctx context.Context, cfg Config) (*Session, error) { + p, err := auth.Get(cfg.Auth) + if err != nil { + return nil, fmt.Errorf("olcrtc: auth provider %q not registered: %w", cfg.Auth, err) + } + + authCfg := auth.Config{ + RoomURL: cfg.RoomID, + Name: cfg.Name, + DNSServer: cfg.DNSServer, + ProxyAddr: cfg.ProxyAddr, + ProxyPort: cfg.ProxyPort, + } + + creds, err := p.Issue(ctx, authCfg) + if err != nil { + return nil, fmt.Errorf("olcrtc: auth issue: %w", err) + } + + engineName := p.Engine() + sess, err := engine.New(ctx, engineName, engine.Config{ + URL: creds.URL, + Token: creds.Token, + Name: cfg.Name, + Extra: creds.Extra, + OnData: cfg.OnData, + DNSServer: cfg.DNSServer, + ProxyAddr: cfg.ProxyAddr, + ProxyPort: cfg.ProxyPort, + Refresh: func(rCtx context.Context) (engine.Credentials, error) { + fresh, freshErr := p.Issue(rCtx, authCfg) + if freshErr != nil { + return engine.Credentials{}, fmt.Errorf("olcrtc: auth refresh: %w", freshErr) + } + return engine.Credentials{URL: fresh.URL, Token: fresh.Token, Extra: fresh.Extra}, nil + }, + }) + if err != nil { + return nil, fmt.Errorf("olcrtc: engine %q: %w", engineName, err) + } + + return &Session{inner: sess, authProvider: p, authCfg: authCfg}, nil +} + +func newDirect(ctx context.Context, cfg Config) (*Session, error) { + if cfg.URL == "" { + return nil, ErrURLRequired + } + if cfg.Token == "" { + return nil, ErrTokenRequired + } + + engineName := cfg.Engine + if engineName == "" { + engineName = "livekit" + } + + sess, err := engine.New(ctx, engineName, engine.Config{ + URL: cfg.URL, + Token: cfg.Token, + Name: cfg.Name, + OnData: cfg.OnData, + DNSServer: cfg.DNSServer, + ProxyAddr: cfg.ProxyAddr, + ProxyPort: cfg.ProxyPort, + }) + if err != nil { + return nil, fmt.Errorf("olcrtc: engine %q: %w", engineName, err) + } + + return &Session{inner: sess}, nil +} + +// Connect establishes the WebRTC connection. Blocks until the data channel (or +// media) is ready, or ctx is cancelled. +func (s *Session) Connect(ctx context.Context) error { + if err := s.inner.Connect(ctx); err != nil { + return fmt.Errorf("connect: %w", err) + } + return nil +} + +// Send queues data for transmission over the data channel. +func (s *Session) Send(data []byte) error { + if err := s.inner.Send(data); err != nil { + return fmt.Errorf("send: %w", err) + } + return nil +} + +// Close tears down the session and releases all resources. +func (s *Session) Close() error { + if err := s.inner.Close(); err != nil { + return fmt.Errorf("close: %w", err) + } + return nil +} + +// WatchConnection monitors the connection and handles reconnects. Run in a +// goroutine alongside Connect. +func (s *Session) WatchConnection(ctx context.Context) { + s.inner.WatchConnection(ctx) +} + +// CanSend reports whether the session is ready to accept outgoing data. +func (s *Session) CanSend() bool { + return s.inner.CanSend() +} + +// SetEndedCallback registers a function called when the session ends +// permanently (after reconnect exhaustion or explicit close). +func (s *Session) SetEndedCallback(cb func(reason string)) { + s.inner.SetEndedCallback(cb) +} + +// SetShouldReconnect controls whether automatic reconnection is attempted. +func (s *Session) SetShouldReconnect(fn func() bool) { + s.inner.SetShouldReconnect(fn) +} diff --git a/pkg/olcrtc/olcrtc_test.go b/pkg/olcrtc/olcrtc_test.go new file mode 100644 index 0000000..a99b699 --- /dev/null +++ b/pkg/olcrtc/olcrtc_test.go @@ -0,0 +1,160 @@ +package olcrtc_test + +import ( + "context" + "errors" + "testing" + + "github.com/openlibrecommunity/olcrtc/internal/auth" + "github.com/openlibrecommunity/olcrtc/internal/engine" + "github.com/openlibrecommunity/olcrtc/pkg/olcrtc" + "github.com/pion/webrtc/v4" +) + +const ( + stubToken = "tok" + stubURL = "wss://x/" +) + +// --- stub engine --- + +type stubSession struct{ connected bool } + +func (s *stubSession) Connect(_ context.Context) error { s.connected = true; return nil } +func (s *stubSession) Send(_ []byte) error { return nil } +func (s *stubSession) Close() error { return nil } +func (s *stubSession) SetReconnectCallback(_ func(*webrtc.DataChannel)) {} +func (s *stubSession) SetShouldReconnect(_ func() bool) {} +func (s *stubSession) SetEndedCallback(_ func(string)) {} +func (s *stubSession) WatchConnection(_ context.Context) {} +func (s *stubSession) CanSend() bool { return s.connected } +func (s *stubSession) GetSendQueue() chan []byte { return nil } +func (s *stubSession) GetBufferedAmount() uint64 { return 0 } +func (s *stubSession) Capabilities() engine.Capabilities { return engine.Capabilities{ByteStream: true} } + +// Compile-time check: stubSession must satisfy engine.Session. +var _ engine.Session = (*stubSession)(nil) + +func registerStubEngine(t *testing.T, name string) { + t.Helper() + engine.Register(name, func(_ context.Context, _ engine.Config) (engine.Session, error) { + return &stubSession{}, nil + }) + t.Cleanup(func() { + // Re-register a no-op so subsequent tests don't break. + engine.Register(name, func(_ context.Context, _ engine.Config) (engine.Session, error) { + return &stubSession{}, nil + }) + }) +} + +// --- stub auth --- + +type stubAuth struct{ engineName string } + +func (a stubAuth) Engine() string { return a.engineName } +func (a stubAuth) Issue(_ context.Context, cfg auth.Config) (auth.Credentials, error) { + if cfg.RoomURL == "" { + return auth.Credentials{}, auth.ErrRoomIDRequired + } + return auth.Credentials{URL: "wss://stub/", Token: stubToken}, nil +} + +func registerStubAuth(t *testing.T, name, engineName string) { + t.Helper() + auth.Register(name, stubAuth{engineName: engineName}) +} + +// --- tests --- + +func TestNewDirect_MissingURL(t *testing.T) { + _, err := olcrtc.New(context.Background(), olcrtc.Config{Token: "tok"}) + if !errors.Is(err, olcrtc.ErrURLRequired) { + t.Fatalf("New(no url) = %v, want ErrURLRequired", err) + } +} + +func TestNewDirect_MissingToken(t *testing.T) { + _, err := olcrtc.New(context.Background(), olcrtc.Config{URL: stubURL}) + if !errors.Is(err, olcrtc.ErrTokenRequired) { + t.Fatalf("New(no token) = %v, want ErrTokenRequired", err) + } +} + +func TestNewDirect_UnknownEngine(t *testing.T) { + _, err := olcrtc.New(context.Background(), olcrtc.Config{ + Engine: "no-such-engine", + URL: stubURL, + Token: stubToken, + }) + if err == nil { + t.Fatal("New(bad engine) error = nil") + } +} + +func TestNewDirect_OK(t *testing.T) { + registerStubEngine(t, "stub-direct") + + sess, err := olcrtc.New(context.Background(), olcrtc.Config{ + Engine: "stub-direct", + URL: stubURL, + Token: stubToken, + }) + if err != nil { + t.Fatalf("New() error = %v", err) + } + if err := sess.Connect(context.Background()); err != nil { + t.Fatalf("Connect() error = %v", err) + } + if !sess.CanSend() { + t.Fatal("CanSend() = false after connect") + } + if err := sess.Close(); err != nil { + t.Fatalf("Close() error = %v", err) + } +} + +func TestNewAuth_UnknownProvider(t *testing.T) { + _, err := olcrtc.New(context.Background(), olcrtc.Config{ + Auth: "no-such-auth", + RoomID: "room", + }) + if err == nil { + t.Fatal("New(bad auth) error = nil") + } +} + +func TestNewAuth_MissingRoomID(t *testing.T) { + registerStubEngine(t, "stub-auth-engine") + registerStubAuth(t, "stub-auth-noroomid", "stub-auth-engine") + + _, err := olcrtc.New(context.Background(), olcrtc.Config{ + Auth: "stub-auth-noroomid", + // RoomID intentionally empty + }) + if err == nil { + t.Fatal("New(auth, no room) error = nil") + } +} + +func TestNewAuth_OK(t *testing.T) { + registerStubEngine(t, "stub-auth-ok-engine") + registerStubAuth(t, "stub-auth-ok", "stub-auth-ok-engine") + + sess, err := olcrtc.New(context.Background(), olcrtc.Config{ + Auth: "stub-auth-ok", + RoomID: "some-room", + }) + if err != nil { + t.Fatalf("New(auth) error = %v", err) + } + if err := sess.Connect(context.Background()); err != nil { + t.Fatalf("Connect() error = %v", err) + } + _ = sess.Close() +} + +func TestRegisterDefaults_Idempotent(_ *testing.T) { + olcrtc.RegisterDefaults() + olcrtc.RegisterDefaults() +} From 5078798204b002c3967b4c31e4ba36ab6906fc92 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 11 May 2026 13:55:49 +0300 Subject: [PATCH 2/4] feat: implement net.Conn via io.Pipe in pkg/olcrtc Add conn.go wrapping Session as net.Conn: Read from pipe fed by OnData, Write calls engine.Send, Close drains pipe and tears down session. Add Session.Dial(ctx) as single-call connect-and-wrap entry point. Co-Authored-By: Claude Sonnet 4.6 --- pkg/olcrtc/conn.go | 51 +++++++++++++++++++++++++++++++++++++++ pkg/olcrtc/olcrtc.go | 49 ++++++++++++++++++++++--------------- pkg/olcrtc/olcrtc_test.go | 37 ++++++++++++++++++++++++++++ 3 files changed, 118 insertions(+), 19 deletions(-) create mode 100644 pkg/olcrtc/conn.go diff --git a/pkg/olcrtc/conn.go b/pkg/olcrtc/conn.go new file mode 100644 index 0000000..c614845 --- /dev/null +++ b/pkg/olcrtc/conn.go @@ -0,0 +1,51 @@ +package olcrtc + +import ( + "errors" + "fmt" + "net" + "time" +) + +// conn wraps a Session as a net.Conn. +// Read is backed by an io.Pipe fed by the engine's OnData callback. +// Write calls Session.Send. +// Deadlines are not supported — callers should use context cancellation. +type conn struct { + s *Session +} + +func (c *conn) Read(b []byte) (int, error) { + n, err := c.s.pr.Read(b) + if err != nil { + return n, fmt.Errorf("read: %w", err) + } + return n, nil +} + +func (c *conn) Write(b []byte) (int, error) { + if err := c.s.inner.Send(b); err != nil { + return 0, fmt.Errorf("write: %w", err) + } + return len(b), nil +} + +func (c *conn) Close() error { + _ = c.s.pw.CloseWithError(net.ErrClosed) + if err := c.s.inner.Close(); err != nil { + return fmt.Errorf("close: %w", err) + } + return nil +} + +func (c *conn) LocalAddr() net.Addr { return webrtcAddr("local") } +func (c *conn) RemoteAddr() net.Addr { return webrtcAddr("remote") } + +func (c *conn) SetDeadline(_ time.Time) error { return errors.ErrUnsupported } +func (c *conn) SetReadDeadline(_ time.Time) error { return errors.ErrUnsupported } +func (c *conn) SetWriteDeadline(_ time.Time) error { return errors.ErrUnsupported } + +type webrtcAddr string + +func (a webrtcAddr) Network() string { return "webrtc" } +func (a webrtcAddr) String() string { return string(a) } diff --git a/pkg/olcrtc/olcrtc.go b/pkg/olcrtc/olcrtc.go index 20099f1..49af5bb 100644 --- a/pkg/olcrtc/olcrtc.go +++ b/pkg/olcrtc/olcrtc.go @@ -1,35 +1,37 @@ // Package olcrtc exposes olcrtc as an embeddable Go library. // -// Typical usage (direct engine, no service-specific auth): +// Typical usage — obtain a [net.Conn]-compatible handle and dial: // // sess, err := olcrtc.New(ctx, olcrtc.Config{ // Engine: "livekit", // URL: "wss://sfu.example/", // Token: "", // }) +// if err != nil { ... } +// conn, err := sess.Dial(ctx) // blocks until WebRTC data channel is ready +// // conn implements net.Conn — pass it to sing-box / any io.ReadWriter consumer // -// Typical usage (built-in auth provider): +// Built-in auth providers (telemost, jazz, wbstream): // // sess, err := olcrtc.New(ctx, olcrtc.Config{ // Auth: "telemost", -// RoomID: "", +// RoomID: "", // }) // -// In both cases the caller must import the engine and (optionally) auth -// packages it needs via blank imports so their init() functions run: +// Import the implementations you need via blank imports, or call [RegisterDefaults]: // // import ( // _ "github.com/openlibrecommunity/olcrtc/internal/engine/livekit" // _ "github.com/openlibrecommunity/olcrtc/internal/auth/telemost" // ) -// -// Or use [RegisterDefaults] to pull in all built-in implementations at once. package olcrtc import ( "context" "errors" "fmt" + "io" + "net" "github.com/openlibrecommunity/olcrtc/internal/auth" "github.com/openlibrecommunity/olcrtc/internal/carrier/builtin" @@ -37,8 +39,6 @@ import ( ) var ( - // ErrAuthOrEngineRequired is returned when neither auth nor engine+URL are supplied. - ErrAuthOrEngineRequired = errors.New("olcrtc: supply either Auth or Engine+URL") // ErrURLRequired is returned when direct mode is used without a URL. ErrURLRequired = errors.New("olcrtc: URL required when using direct engine mode") // ErrTokenRequired is returned when direct mode is used without a token. @@ -68,16 +68,14 @@ type Config struct { // ProxyAddr / ProxyPort configure an outbound SOCKS5 proxy. ProxyAddr string ProxyPort int - // OnData, when set, receives incoming data-channel bytes. If nil the - // session operates in video-track / media-only mode. - OnData func([]byte) } // Session is the library handle returned by [New]. -// Connect must be called before Send. Close releases all resources. +// Call [Session.Dial] to connect and obtain a [net.Conn]. type Session struct { - inner engine.Session - // refresh is stored so it survives reconnects via the engine's Refresh hook. + inner engine.Session + pr *io.PipeReader + pw *io.PipeWriter authProvider auth.Provider authCfg auth.Config } @@ -117,13 +115,14 @@ func newWithAuth(ctx context.Context, cfg Config) (*Session, error) { return nil, fmt.Errorf("olcrtc: auth issue: %w", err) } + pr, pw := io.Pipe() engineName := p.Engine() sess, err := engine.New(ctx, engineName, engine.Config{ URL: creds.URL, Token: creds.Token, Name: cfg.Name, Extra: creds.Extra, - OnData: cfg.OnData, + OnData: func(data []byte) { _, _ = pw.Write(data) }, DNSServer: cfg.DNSServer, ProxyAddr: cfg.ProxyAddr, ProxyPort: cfg.ProxyPort, @@ -136,10 +135,11 @@ func newWithAuth(ctx context.Context, cfg Config) (*Session, error) { }, }) if err != nil { + _ = pw.CloseWithError(err) return nil, fmt.Errorf("olcrtc: engine %q: %w", engineName, err) } - return &Session{inner: sess, authProvider: p, authCfg: authCfg}, nil + return &Session{inner: sess, pr: pr, pw: pw, authProvider: p, authCfg: authCfg}, nil } func newDirect(ctx context.Context, cfg Config) (*Session, error) { @@ -155,20 +155,31 @@ func newDirect(ctx context.Context, cfg Config) (*Session, error) { engineName = "livekit" } + pr, pw := io.Pipe() sess, err := engine.New(ctx, engineName, engine.Config{ URL: cfg.URL, Token: cfg.Token, Name: cfg.Name, - OnData: cfg.OnData, + OnData: func(data []byte) { _, _ = pw.Write(data) }, DNSServer: cfg.DNSServer, ProxyAddr: cfg.ProxyAddr, ProxyPort: cfg.ProxyPort, }) if err != nil { + _ = pw.CloseWithError(err) return nil, fmt.Errorf("olcrtc: engine %q: %w", engineName, err) } - return &Session{inner: sess}, nil + return &Session{inner: sess, pr: pr, pw: pw}, nil +} + +// Dial connects and returns a [net.Conn] backed by the WebRTC data channel. +// It combines [Session.Connect] + wrapping in a single call. +func (s *Session) Dial(ctx context.Context) (net.Conn, error) { + if err := s.Connect(ctx); err != nil { + return nil, err + } + return &conn{s: s}, nil } // Connect establishes the WebRTC connection. Blocks until the data channel (or diff --git a/pkg/olcrtc/olcrtc_test.go b/pkg/olcrtc/olcrtc_test.go index a99b699..4ee6868 100644 --- a/pkg/olcrtc/olcrtc_test.go +++ b/pkg/olcrtc/olcrtc_test.go @@ -158,3 +158,40 @@ func TestRegisterDefaults_Idempotent(_ *testing.T) { olcrtc.RegisterDefaults() olcrtc.RegisterDefaults() } + +func TestDial_RoundTrip(t *testing.T) { + registerStubEngine(t, "stub-dial") + + sess, err := olcrtc.New(context.Background(), olcrtc.Config{ + Engine: "stub-dial", + URL: stubURL, + Token: stubToken, + }) + if err != nil { + t.Fatalf("New() error = %v", err) + } + + c, err := sess.Dial(context.Background()) + if err != nil { + t.Fatalf("Dial() error = %v", err) + } + + // Write should succeed (stub Send is a no-op). + payload := []byte("hello") + n, err := c.Write(payload) + if err != nil || n != len(payload) { + t.Fatalf("Write() = (%d, %v)", n, err) + } + + // Close should unblock any pending Read. + if err := c.Close(); err != nil { + t.Fatalf("Close() error = %v", err) + } + + // Read after close should return an error (pipe closed). + buf := make([]byte, 4) + _, err = c.Read(buf) + if err == nil { + t.Fatal("Read() after Close() should return error") + } +} From f287dc117aee403d6a1f31008568cb99dd83f0dc Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 11 May 2026 13:58:51 +0300 Subject: [PATCH 3/4] feat: expose CreateRoom in pkg/olcrtc public API Add olcrtc.CreateRoom(ctx, authName) that delegates to the auth provider's RoomCreator interface. Returns ErrRoomCreationUnsupported for providers that don't support room creation (e.g. telemost). Co-Authored-By: Claude Sonnet 4.6 --- pkg/olcrtc/olcrtc.go | 22 ++++++++++++++++++++++ pkg/olcrtc/olcrtc_test.go | 33 +++++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+) diff --git a/pkg/olcrtc/olcrtc.go b/pkg/olcrtc/olcrtc.go index 49af5bb..6b2c9d9 100644 --- a/pkg/olcrtc/olcrtc.go +++ b/pkg/olcrtc/olcrtc.go @@ -43,6 +43,8 @@ var ( ErrURLRequired = errors.New("olcrtc: URL required when using direct engine mode") // ErrTokenRequired is returned when direct mode is used without a token. ErrTokenRequired = errors.New("olcrtc: Token required when using direct engine mode") + // ErrRoomCreationUnsupported is returned when the auth provider cannot create rooms. + ErrRoomCreationUnsupported = errors.New("olcrtc: auth provider does not support room creation") ) // Config is the input to [New]. @@ -228,3 +230,23 @@ func (s *Session) SetEndedCallback(cb func(reason string)) { func (s *Session) SetShouldReconnect(fn func() bool) { s.inner.SetShouldReconnect(fn) } + +// CreateRoom creates a new room via the auth provider and returns the room ID. +// Only works when the session was created with Auth set to a provider that +// supports room creation (wbstream, jazz). Returns [ErrRoomCreationUnsupported] +// for providers that don't support it (e.g. telemost). +func CreateRoom(ctx context.Context, authName string) (string, error) { + p, err := auth.Get(authName) + if err != nil { + return "", fmt.Errorf("olcrtc: auth provider %q not registered: %w", authName, err) + } + creator, ok := p.(auth.RoomCreator) + if !ok { + return "", fmt.Errorf("%w: %s", ErrRoomCreationUnsupported, authName) + } + roomID, err := creator.CreateRoom(ctx, auth.Config{}) + if err != nil { + return "", fmt.Errorf("olcrtc: create room: %w", err) + } + return roomID, nil +} diff --git a/pkg/olcrtc/olcrtc_test.go b/pkg/olcrtc/olcrtc_test.go index 4ee6868..47af01e 100644 --- a/pkg/olcrtc/olcrtc_test.go +++ b/pkg/olcrtc/olcrtc_test.go @@ -60,11 +60,22 @@ func (a stubAuth) Issue(_ context.Context, cfg auth.Config) (auth.Credentials, e return auth.Credentials{URL: "wss://stub/", Token: stubToken}, nil } +type stubAuthWithRoomCreator struct{ stubAuth } + +func (stubAuthWithRoomCreator) CreateRoom(_ context.Context, _ auth.Config) (string, error) { + return "created-room-id", nil +} + func registerStubAuth(t *testing.T, name, engineName string) { t.Helper() auth.Register(name, stubAuth{engineName: engineName}) } +func registerStubAuthWithCreator(t *testing.T, name, engineName string) { + t.Helper() + auth.Register(name, stubAuthWithRoomCreator{stubAuth{engineName: engineName}}) +} + // --- tests --- func TestNewDirect_MissingURL(t *testing.T) { @@ -159,6 +170,28 @@ func TestRegisterDefaults_Idempotent(_ *testing.T) { olcrtc.RegisterDefaults() } +func TestCreateRoom_Unsupported(t *testing.T) { + registerStubAuth(t, "stub-nocreate", "stub-direct") + + _, err := olcrtc.CreateRoom(context.Background(), "stub-nocreate") + if !errors.Is(err, olcrtc.ErrRoomCreationUnsupported) { + t.Fatalf("CreateRoom(no creator) = %v, want ErrRoomCreationUnsupported", err) + } +} + +func TestCreateRoom_OK(t *testing.T) { + registerStubEngine(t, "stub-creator-engine") + registerStubAuthWithCreator(t, "stub-creator", "stub-creator-engine") + + roomID, err := olcrtc.CreateRoom(context.Background(), "stub-creator") + if err != nil { + t.Fatalf("CreateRoom() error = %v", err) + } + if roomID == "" { + t.Fatal("CreateRoom() returned empty room ID") + } +} + func TestDial_RoundTrip(t *testing.T) { registerStubEngine(t, "stub-dial") From f4ab63b5fa0af72936264d96cade367cdc7884c1 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Mon, 11 May 2026 14:02:28 +0300 Subject: [PATCH 4/4] =?UTF-8?q?feat:=20wire=20WatchConnection=20into=20Dia?= =?UTF-8?q?l=20=E2=80=94=20Read=20unblocks=20on=20session=20end?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Dial now sets SetEndedCallback to close the pipe with ErrSessionEnded and starts WatchConnection in a goroutine. Consumers (e.g. sing-box) get a concrete error from Read when the session dies permanently. Co-Authored-By: Claude Sonnet 4.6 --- pkg/olcrtc/olcrtc.go | 8 +++++ pkg/olcrtc/olcrtc_test.go | 70 +++++++++++++++++++++++++++++++++++---- 2 files changed, 72 insertions(+), 6 deletions(-) diff --git a/pkg/olcrtc/olcrtc.go b/pkg/olcrtc/olcrtc.go index 6b2c9d9..7999d63 100644 --- a/pkg/olcrtc/olcrtc.go +++ b/pkg/olcrtc/olcrtc.go @@ -45,6 +45,8 @@ var ( ErrTokenRequired = errors.New("olcrtc: Token required when using direct engine mode") // ErrRoomCreationUnsupported is returned when the auth provider cannot create rooms. ErrRoomCreationUnsupported = errors.New("olcrtc: auth provider does not support room creation") + // ErrSessionEnded is returned from Read/Write when the session has ended permanently. + ErrSessionEnded = errors.New("olcrtc: session ended") ) // Config is the input to [New]. @@ -177,10 +179,16 @@ func newDirect(ctx context.Context, cfg Config) (*Session, error) { // Dial connects and returns a [net.Conn] backed by the WebRTC data channel. // It combines [Session.Connect] + wrapping in a single call. +// The connection watcher runs in the background for the lifetime of ctx; +// when the session ends permanently, Read will return an error. func (s *Session) Dial(ctx context.Context) (net.Conn, error) { + s.inner.SetEndedCallback(func(_ string) { + _ = s.pw.CloseWithError(ErrSessionEnded) + }) if err := s.Connect(ctx); err != nil { return nil, err } + go s.inner.WatchConnection(ctx) return &conn{s: s}, nil } diff --git a/pkg/olcrtc/olcrtc_test.go b/pkg/olcrtc/olcrtc_test.go index 47af01e..27ca8a4 100644 --- a/pkg/olcrtc/olcrtc_test.go +++ b/pkg/olcrtc/olcrtc_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "testing" + "time" "github.com/openlibrecommunity/olcrtc/internal/auth" "github.com/openlibrecommunity/olcrtc/internal/engine" @@ -18,15 +19,21 @@ const ( // --- stub engine --- -type stubSession struct{ connected bool } +type stubSession struct { + connected bool + onEnded func(string) + watchBlock chan struct{} // closed to unblock WatchConnection +} + +func newStubSession() *stubSession { return &stubSession{watchBlock: make(chan struct{})} } func (s *stubSession) Connect(_ context.Context) error { s.connected = true; return nil } func (s *stubSession) Send(_ []byte) error { return nil } func (s *stubSession) Close() error { return nil } func (s *stubSession) SetReconnectCallback(_ func(*webrtc.DataChannel)) {} func (s *stubSession) SetShouldReconnect(_ func() bool) {} -func (s *stubSession) SetEndedCallback(_ func(string)) {} -func (s *stubSession) WatchConnection(_ context.Context) {} +func (s *stubSession) SetEndedCallback(cb func(string)) { s.onEnded = cb } +func (s *stubSession) WatchConnection(_ context.Context) { <-s.watchBlock } func (s *stubSession) CanSend() bool { return s.connected } func (s *stubSession) GetSendQueue() chan []byte { return nil } func (s *stubSession) GetBufferedAmount() uint64 { return 0 } @@ -38,12 +45,24 @@ var _ engine.Session = (*stubSession)(nil) func registerStubEngine(t *testing.T, name string) { t.Helper() engine.Register(name, func(_ context.Context, _ engine.Config) (engine.Session, error) { - return &stubSession{}, nil + return newStubSession(), nil }) t.Cleanup(func() { - // Re-register a no-op so subsequent tests don't break. engine.Register(name, func(_ context.Context, _ engine.Config) (engine.Session, error) { - return &stubSession{}, nil + return newStubSession(), nil + }) + }) +} + +// registerStubEngineControlled registers an engine that returns a pre-built stub the test controls. +func registerStubEngineControlled(t *testing.T, name string, stub *stubSession) { + t.Helper() + engine.Register(name, func(_ context.Context, _ engine.Config) (engine.Session, error) { + return stub, nil + }) + t.Cleanup(func() { + engine.Register(name, func(_ context.Context, _ engine.Config) (engine.Session, error) { + return newStubSession(), nil }) }) } @@ -192,6 +211,45 @@ func TestCreateRoom_OK(t *testing.T) { } } +func TestDial_ReadUnblocksOnSessionEnd(t *testing.T) { + stub := newStubSession() + registerStubEngineControlled(t, "stub-ended", stub) + + sess, err := olcrtc.New(context.Background(), olcrtc.Config{ + Engine: "stub-ended", + URL: stubURL, + Token: stubToken, + }) + if err != nil { + t.Fatalf("New() error = %v", err) + } + + c, err := sess.Dial(context.Background()) + if err != nil { + t.Fatalf("Dial() error = %v", err) + } + + readErr := make(chan error, 1) + go func() { + buf := make([]byte, 4) + _, err := c.Read(buf) + readErr <- err + }() + + // Simulate session ending permanently. + stub.onEnded("test reason") + close(stub.watchBlock) + + select { + case err := <-readErr: + if err == nil { + t.Fatal("Read() should return error after session ended") + } + case <-time.After(time.Second): + t.Fatal("Read() did not unblock after session ended") + } +} + func TestDial_RoundTrip(t *testing.T) { registerStubEngine(t, "stub-dial")