From 6e6265799ab403dea22e34ee044d4a92f8403167 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Tue, 14 Apr 2026 01:10:57 +0300 Subject: [PATCH] feat(provider): abstract provider interface and add jazz support --- cmd/olcrtc/main.go | 40 +- internal/client/client.go | 30 +- internal/provider/errors.go | 13 + internal/provider/jazz/api.go | 132 ++++++ internal/provider/jazz/peer.go | 528 +++++++++++++++++++++++ internal/provider/jazz/provider.go | 66 +++ internal/provider/provider.go | 53 +++ internal/{ => provider}/telemost/api.go | 0 internal/{ => provider}/telemost/peer.go | 16 +- internal/provider/telemost/provider.go | 66 +++ internal/server/server.go | 27 +- 11 files changed, 933 insertions(+), 38 deletions(-) create mode 100644 internal/provider/errors.go create mode 100644 internal/provider/jazz/api.go create mode 100644 internal/provider/jazz/peer.go create mode 100644 internal/provider/jazz/provider.go create mode 100644 internal/provider/provider.go rename internal/{ => provider}/telemost/api.go (100%) rename internal/{ => provider}/telemost/peer.go (97%) create mode 100644 internal/provider/telemost/provider.go diff --git a/cmd/olcrtc/main.go b/cmd/olcrtc/main.go index 846f4e4..98d1116 100644 --- a/cmd/olcrtc/main.go +++ b/cmd/olcrtc/main.go @@ -16,6 +16,9 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/client" "github.com/openlibrecommunity/olcrtc/internal/logger" "github.com/openlibrecommunity/olcrtc/internal/names" + "github.com/openlibrecommunity/olcrtc/internal/provider" + _ "github.com/openlibrecommunity/olcrtc/internal/provider/jazz" + _ "github.com/openlibrecommunity/olcrtc/internal/provider/telemost" "github.com/openlibrecommunity/olcrtc/internal/server" ) @@ -34,9 +37,10 @@ type config struct { } var ( - errUnsupportedProvider = errors.New("only telemost provider supported") errRoomIDRequired = errors.New("room ID required") errModeRequired = errors.New("specify -mode srv or -mode cnc") + errProviderRequired = errors.New("provider required (use -provider telemost or -provider jazz)") + errUnsupportedProvider = errors.New("unsupported provider") ) func main() { @@ -86,8 +90,8 @@ func parseFlags() config { cfg := config{} flag.StringVar(&cfg.mode, "mode", "", "Mode: srv or cnc") - flag.StringVar(&cfg.roomID, "id", "", "Telemost room ID") - flag.StringVar(&cfg.provider, "provider", "telemost", "Provider (telemost only)") + flag.StringVar(&cfg.roomID, "id", "", "Room ID") + flag.StringVar(&cfg.provider, "provider", "", "Provider: telemost or jazz (required)") flag.IntVar(&cfg.socksPort, "socks-port", 1080, "SOCKS5 port (client only)") flag.StringVar(&cfg.socksHost, "socks-host", "127.0.0.1", "SOCKS5 listen host (client only)") flag.StringVar(&cfg.keyHex, "key", "", "Shared encryption key (hex)") @@ -112,9 +116,20 @@ func configureLogging(debug bool) { } func validateConfig(cfg config) error { + available := provider.Available() + validProvider := false + for _, p := range available { + if cfg.provider == p { + validProvider = true + break + } + } + switch { - case cfg.provider != "telemost": - return errUnsupportedProvider + case cfg.provider == "": + return errProviderRequired + case !validProvider: + return fmt.Errorf("%w: %s (available: %v)", errUnsupportedProvider, cfg.provider, available) case cfg.roomID == "": return errRoomIDRequired case cfg.mode != "srv" && cfg.mode != "cnc": @@ -148,12 +163,13 @@ func loadNames(dataDir string) error { } func runMode(ctx context.Context, cfg config, errCh chan<- error) { - roomURL := "https://telemost.yandex.ru/j/" + cfg.roomID + roomURL := buildRoomURL(cfg.provider, cfg.roomID) switch cfg.mode { case "srv": errCh <- server.Run( ctx, + cfg.provider, roomURL, cfg.keyHex, cfg.dnsServer, @@ -163,6 +179,7 @@ func runMode(ctx context.Context, cfg config, errCh chan<- error) { case "cnc": errCh <- client.Run( ctx, + cfg.provider, roomURL, cfg.keyHex, cfg.socksPort, @@ -173,6 +190,17 @@ func runMode(ctx context.Context, cfg config, errCh chan<- error) { } } +func buildRoomURL(providerName, roomID string) string { + switch providerName { + case "telemost": + return "https://telemost.yandex.ru/j/" + roomID + case "jazz": + return roomID + default: + return roomID + } +} + func waitForShutdown(errCh <-chan error) error { done := make(chan error, 1) go func() { diff --git a/internal/client/client.go b/internal/client/client.go index d1a342c..02e199b 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -21,7 +21,9 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/logger" "github.com/openlibrecommunity/olcrtc/internal/mux" "github.com/openlibrecommunity/olcrtc/internal/names" - "github.com/openlibrecommunity/olcrtc/internal/telemost" + "github.com/openlibrecommunity/olcrtc/internal/provider" + _ "github.com/openlibrecommunity/olcrtc/internal/provider/jazz" + _ "github.com/openlibrecommunity/olcrtc/internal/provider/telemost" "github.com/pion/webrtc/v4" ) @@ -31,9 +33,8 @@ var ( errNoConnectedPeers = errors.New("no connected peers available") ) -// Client manages the client-side mux and SOCKS5 listener. type Client struct { - peers []*telemost.Peer + peers []provider.Provider cipher *crypto.Cipher mux *mux.Multiplexer clientID uint32 @@ -43,9 +44,9 @@ type Client struct { const defaultSOCKSListenHost = "127.0.0.1" -// Run starts the client and listens for SOCKS5 traffic. func Run( ctx context.Context, + providerName, roomURL, keyHex string, socksPort int, @@ -53,12 +54,12 @@ func Run( socksUser, socksPass string, ) error { - return RunWithReady(ctx, roomURL, keyHex, socksPort, socksHost, socksUser, socksPass, nil) + return RunWithReady(ctx, providerName, roomURL, keyHex, socksPort, socksHost, socksUser, socksPass, nil) } -// RunWithReady starts the client and invokes onReady once the local SOCKS5 listener is accepting connections. func RunWithReady( ctx context.Context, + providerName, roomURL, keyHex string, socksPort int, @@ -88,13 +89,13 @@ func RunWithReady( c := &Client{ cipher: cipher, clientID: uint32(time.Now().UnixNano() & 0xFFFFFFFF), - peers: make([]*telemost.Peer, 0, 1), + peers: make([]provider.Provider, 0, 1), } c.mux = mux.New(c.clientID, c.sendFrame) for peerID := range 1 { - if err := c.addPeer(runCtx, roomURL, peerID, cancel); err != nil { + if err := c.addPeer(runCtx, providerName, roomURL, peerID, cancel); err != nil { return fmt.Errorf("addPeer failed: %w", err) } } @@ -152,7 +153,7 @@ func (c *Client) sendFrame(frame []byte) error { return nil } -func waitUntilPeersCanSend(peers []*telemost.Peer) { +func waitUntilPeersCanSend(peers []provider.Provider) { for { canSend := true for _, peer := range peers { @@ -170,7 +171,7 @@ func waitUntilPeersCanSend(peers []*telemost.Peer) { } } -func (c *Client) nextPeer() (*telemost.Peer, error) { +func (c *Client) nextPeer() (provider.Provider, error) { switch len(c.peers) { case 0: return nil, errNoConnectedPeers @@ -183,11 +184,16 @@ func (c *Client) nextPeer() (*telemost.Peer, error) { func (c *Client) addPeer( runCtx context.Context, + providerName, roomURL string, peerID int, cancel context.CancelFunc, ) error { - peer, err := telemost.NewPeer(runCtx, roomURL, names.Generate(), c.onData) + peer, err := provider.New(runCtx, providerName, provider.Config{ + RoomURL: roomURL, + Name: names.Generate(), + OnData: c.onData, + }) if err != nil { return fmt.Errorf("create peer %d: %w", peerID, err) } @@ -203,7 +209,7 @@ func (c *Client) addPeer( c.peers = append(c.peers, peer) - log.Printf("Connecting peer %d to Telemost...", peerID) + log.Printf("Connecting peer %d to %s...", peerID, providerName) if err := peer.Connect(runCtx); err != nil { return fmt.Errorf("connect peer %d: %w", peerID, err) } diff --git a/internal/provider/errors.go b/internal/provider/errors.go new file mode 100644 index 0000000..6a63ea2 --- /dev/null +++ b/internal/provider/errors.go @@ -0,0 +1,13 @@ +package provider + +import "errors" + +var ( + ErrProviderNotFound = errors.New("provider not found") + ErrDataChannelTimeout = errors.New("datachannel timeout") + ErrDataChannelNotReady = errors.New("datachannel not ready") + ErrSendQueueClosed = errors.New("send queue closed") + ErrSendQueueTimeout = errors.New("send queue timeout") + ErrSessionClosed = errors.New("session closed") + ErrPeerClosed = errors.New("peer closed") +) diff --git a/internal/provider/jazz/api.go b/internal/provider/jazz/api.go new file mode 100644 index 0000000..cfd604f --- /dev/null +++ b/internal/provider/jazz/api.go @@ -0,0 +1,132 @@ +package jazz + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + + "github.com/google/uuid" + "github.com/openlibrecommunity/olcrtc/internal/protect" +) + +const apiBase = "https://bk.salutejazz.ru" + +type RoomInfo struct { + RoomID string `json:"roomId"` + Password string `json:"password"` + ConnectorURL string `json:"connectorUrl"` +} + +func createRoom(ctx context.Context) (*RoomInfo, error) { + clientID := uuid.New().String() + headers := map[string]string{ + "X-Jazz-ClientId": clientID, + "X-Jazz-AuthType": "ANONYMOUS", + "X-Client-AuthType": "ANONYMOUS", + "Content-Type": "application/json", + } + + createPayload := map[string]any{ + "title": "olcrtc", + "guestEnabled": true, + "lobbyEnabled": false, + "serverVideoRecordAutoStartEnabled": false, + "sipEnabled": false, + "moderatorEmails": []string{}, + "summarizationEnabled": false, + "room3dEnabled": false, + "room3dScene": "XRLobby", + } + + body, err := json.Marshal(createPayload) + if err != nil { + return nil, fmt.Errorf("marshal create payload: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, http.MethodPost, apiBase+"/room/create-meeting", + bytes.NewReader(body)) + if err != nil { + return nil, fmt.Errorf("create request: %w", err) + } + + for k, v := range headers { + req.Header.Set(k, v) + } + + client := protect.NewHTTPClient() + resp, err := client.Do(req) + if err != nil { + return nil, fmt.Errorf("do create request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("create room failed: status %d", resp.StatusCode) + } + + var createResp struct { + RoomID string `json:"roomId"` + Password string `json:"password"` + } + if err := json.NewDecoder(resp.Body).Decode(&createResp); err != nil { + return nil, fmt.Errorf("decode create response: %w", err) + } + + preconnectPayload := map[string]any{ + "password": createResp.Password, + "jazzNextMigration": map[string]any{ + "b2bBaseRoomSupport": true, + "demoRoomBaseSupport": true, + "demoRoomVersionSupport": 2, + "mediaWithoutAutoSubscribeSupport": true, + "webinarSpeakerSupport": true, + "webinarViewerSupport": true, + "sdkRoomSupport": true, + "sberclassRoomSupport": true, + }, + } + + preBody, err := json.Marshal(preconnectPayload) + if err != nil { + return nil, fmt.Errorf("marshal preconnect payload: %w", err) + } + + preReq, err := http.NewRequestWithContext( + ctx, + http.MethodPost, + fmt.Sprintf("%s/room/%s/preconnect", apiBase, createResp.RoomID), + bytes.NewReader(preBody), + ) + if err != nil { + return nil, fmt.Errorf("create preconnect request: %w", err) + } + + for k, v := range headers { + preReq.Header.Set(k, v) + } + + preResp, err := client.Do(preReq) + if err != nil { + return nil, fmt.Errorf("do preconnect request: %w", err) + } + defer preResp.Body.Close() + + if preResp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("preconnect failed: status %d", preResp.StatusCode) + } + + var preconnectResp struct { + ConnectorURL string `json:"connectorUrl"` + } + if err := json.NewDecoder(preResp.Body).Decode(&preconnectResp); err != nil { + return nil, fmt.Errorf("decode preconnect response: %w", err) + } + + return &RoomInfo{ + RoomID: createResp.RoomID, + Password: createResp.Password, + ConnectorURL: preconnectResp.ConnectorURL, + }, nil +} diff --git a/internal/provider/jazz/peer.go b/internal/provider/jazz/peer.go new file mode 100644 index 0000000..3bc0a81 --- /dev/null +++ b/internal/provider/jazz/peer.go @@ -0,0 +1,528 @@ +package jazz + +import ( + "context" + "fmt" + "log" + "sync" + "sync/atomic" + "time" + + "github.com/google/uuid" + "github.com/gorilla/websocket" + "github.com/openlibrecommunity/olcrtc/internal/logger" + "github.com/openlibrecommunity/olcrtc/internal/protect" + "github.com/openlibrecommunity/olcrtc/internal/provider" + "github.com/pion/webrtc/v4" +) + +type Peer struct { + name string + roomInfo *RoomInfo + ws *websocket.Conn + wsMu sync.Mutex + pcSub *webrtc.PeerConnection + pcPub *webrtc.PeerConnection + dc *webrtc.DataChannel + onData func([]byte) + onReconnect func(*webrtc.DataChannel) + shouldReconnect func() bool + reconnectCh chan struct{} + closeCh chan struct{} + closed atomic.Bool + reconnecting atomic.Bool + sendQueue chan []byte + sendQueueClosed atomic.Bool + onEnded func(string) + sessionCloseCh chan struct{} + sessionMu sync.Mutex + wg sync.WaitGroup + groupID string +} + +func NewPeer(ctx context.Context, name string, onData func([]byte)) (*Peer, error) { + roomInfo, err := createRoom(ctx) + if err != nil { + return nil, fmt.Errorf("create room: %w", err) + } + + log.Printf("Jazz room created: %s (password: %s)", roomInfo.RoomID, roomInfo.Password) + + return &Peer{ + name: name, + roomInfo: roomInfo, + onData: onData, + reconnectCh: make(chan struct{}, 1), + closeCh: make(chan struct{}), + sessionCloseCh: make(chan struct{}), + sendQueue: make(chan []byte, 5000), + }, nil +} + +func (p *Peer) Connect(ctx context.Context) error { + p.closed.Store(false) + + config := webrtc.Configuration{ + ICEServers: []webrtc.ICEServer{}, + SDPSemantics: webrtc.SDPSemanticsUnifiedPlan, + BundlePolicy: webrtc.BundlePolicyMaxBundle, + } + + settingEngine := webrtc.SettingEngine{} + if protect.Protector != nil { + settingEngine.SetICEProxyDialer(protect.NewProxyDialer()) + } + api := webrtc.NewAPI(webrtc.WithSettingEngine(settingEngine)) + + var err error + p.pcSub, err = api.NewPeerConnection(config) + if err != nil { + return fmt.Errorf("create subscriber pc: %w", err) + } + + p.pcPub, err = api.NewPeerConnection(config) + if err != nil { + return fmt.Errorf("create publisher pc: %w", err) + } + + p.dc, err = p.pcPub.CreateDataChannel("_reliable", &webrtc.DataChannelInit{ + Ordered: func() *bool { v := true; return &v }(), + }) + if err != nil { + return fmt.Errorf("create datachannel: %w", err) + } + + dcReady := make(chan struct{}) + p.setupDataChannelHandlers(dcReady) + + if err := p.dialWebSocket(); err != nil { + return err + } + + if err := p.sendJoin(); err != nil { + return err + } + + p.wg.Add(1) + go func() { + defer p.wg.Done() + p.handleSignaling(ctx) + }() + + select { + case <-dcReady: + return nil + case <-time.After(30 * time.Second): + return provider.ErrDataChannelTimeout + case <-ctx.Done(): + return fmt.Errorf("connect cancelled: %w", ctx.Err()) + } +} + +func (p *Peer) dialWebSocket() error { + wsDialer := websocket.Dialer{ + NetDialContext: protect.DialContext, + HandshakeTimeout: 15 * time.Second, + } + + ws, resp, err := wsDialer.Dial(p.roomInfo.ConnectorURL, nil) + if err != nil { + return fmt.Errorf("dial websocket: %w", err) + } + if resp != nil && resp.Body != nil { + _ = resp.Body.Close() + } + + p.ws = ws + ws.SetPongHandler(func(string) error { + _ = ws.SetReadDeadline(time.Now().Add(60 * time.Second)) + return nil + }) + _ = ws.SetReadDeadline(time.Now().Add(60 * time.Second)) + + return nil +} + +func (p *Peer) sendJoin() error { + joinMsg := map[string]any{ + "roomId": p.roomInfo.RoomID, + "event": "join", + "requestId": uuid.New().String(), + "payload": map[string]any{ + "password": p.roomInfo.Password, + "participantName": p.name, + "supportedFeatures": map[string]any{ + "attachedRooms": true, + "sessionGroups": true, + "transcription": true, + }, + "isSilent": false, + }, + } + + p.wsMu.Lock() + defer p.wsMu.Unlock() + return p.ws.WriteJSON(joinMsg) +} + +func (p *Peer) setupDataChannelHandlers(dcReady chan struct{}) { + p.dc.OnOpen(func() { + p.wg.Add(1) + go func() { + defer p.wg.Done() + p.processSendQueue() + }() + close(dcReady) + }) + + p.dc.OnClose(func() { + if !p.closed.Load() { + p.queueReconnect() + } + }) + + p.dc.OnMessage(func(msg webrtc.DataChannelMessage) { + if p.onData != nil && len(msg.Data) > 0 { + p.onData(msg.Data) + } + }) + + p.pcSub.OnDataChannel(func(dc *webrtc.DataChannel) { + dc.OnMessage(func(msg webrtc.DataChannelMessage) { + if p.onData != nil && len(msg.Data) > 0 { + p.onData(msg.Data) + } + }) + }) +} + +func (p *Peer) handleSignaling(ctx context.Context) { + for { + var msg map[string]any + if err := p.ws.ReadJSON(&msg); err != nil { + logger.Debugf("ws read error: %v", err) + if !p.closed.Load() { + p.queueReconnect() + } + return + } + + p.updateWSDeadline() + + event, _ := msg["event"].(string) + payload, _ := msg["payload"].(map[string]any) + + switch event { + case "join-response": + p.handleJoinResponse(payload) + case "media-out": + p.handleMediaOut(ctx, payload) + } + } +} + +func (p *Peer) handleJoinResponse(payload map[string]any) { + group, _ := payload["participantGroup"].(map[string]any) + p.groupID, _ = group["groupId"].(string) + logger.Verbosef("Jazz peer joined: groupId=%s", p.groupID) +} + +func (p *Peer) handleMediaOut(ctx context.Context, payload map[string]any) { + method, _ := payload["method"].(string) + + switch method { + case "rtc:config": + p.handleRTCConfig(payload) + case "rtc:join": + logger.Verbosef("Jazz rtc:join received") + case "rtc:offer": + p.handleSubscriberOffer(ctx, payload) + case "rtc:answer": + p.handlePublisherAnswer(payload) + case "rtc:ice": + p.handleICE(payload) + } +} + +func (p *Peer) handleRTCConfig(payload map[string]any) { + config, _ := payload["configuration"].(map[string]any) + servers, _ := config["iceServers"].([]any) + + var iceServers []webrtc.ICEServer + for _, s := range servers { + server, _ := s.(map[string]any) + urls, _ := server["urls"].([]any) + username, _ := server["username"].(string) + credential, _ := server["credential"].(string) + + var urlStrs []string + for _, u := range urls { + if urlStr, ok := u.(string); ok && urlStr != "" { + urlStrs = append(urlStrs, urlStr) + } + } + + if len(urlStrs) > 0 { + iceServers = append(iceServers, webrtc.ICEServer{ + URLs: urlStrs, + Username: username, + Credential: credential, + }) + } + } + + if len(iceServers) > 0 { + newConfig := webrtc.Configuration{ + ICEServers: iceServers, + SDPSemantics: webrtc.SDPSemanticsUnifiedPlan, + BundlePolicy: webrtc.BundlePolicyMaxBundle, + } + _ = p.pcSub.SetConfiguration(newConfig) + _ = p.pcPub.SetConfiguration(newConfig) + } +} + +func (p *Peer) handleSubscriberOffer(ctx context.Context, payload map[string]any) { + desc, _ := payload["description"].(map[string]any) + sdp, _ := desc["sdp"].(string) + + if err := p.pcSub.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeOffer, + SDP: sdp, + }); err != nil { + logger.Debugf("set remote desc error: %v", err) + return + } + + answer, err := p.pcSub.CreateAnswer(nil) + if err != nil { + logger.Debugf("create answer error: %v", err) + return + } + + if err := p.pcSub.SetLocalDescription(answer); err != nil { + logger.Debugf("set local desc error: %v", err) + return + } + + p.wsMu.Lock() + _ = p.ws.WriteJSON(map[string]any{ + "roomId": p.roomInfo.RoomID, + "event": "media-in", + "groupId": p.groupID, + "requestId": uuid.New().String(), + "payload": map[string]any{ + "method": "rtc:answer", + "description": map[string]any{ + "type": "answer", + "sdp": answer.SDP, + }, + }, + }) + p.wsMu.Unlock() + + time.Sleep(300 * time.Millisecond) + p.sendPublisherOffer() +} + +func (p *Peer) sendPublisherOffer() { + offer, err := p.pcPub.CreateOffer(nil) + if err != nil { + logger.Debugf("create pub offer error: %v", err) + return + } + + if err := p.pcPub.SetLocalDescription(offer); err != nil { + logger.Debugf("set local pub desc error: %v", err) + return + } + + p.wsMu.Lock() + _ = p.ws.WriteJSON(map[string]any{ + "roomId": p.roomInfo.RoomID, + "event": "media-in", + "groupId": p.groupID, + "requestId": uuid.New().String(), + "payload": map[string]any{ + "method": "rtc:offer", + "description": map[string]any{ + "type": "offer", + "sdp": offer.SDP, + }, + }, + }) + p.wsMu.Unlock() +} + +func (p *Peer) handlePublisherAnswer(payload map[string]any) { + desc, _ := payload["description"].(map[string]any) + sdp, _ := desc["sdp"].(string) + + if err := p.pcPub.SetRemoteDescription(webrtc.SessionDescription{ + Type: webrtc.SDPTypeAnswer, + SDP: sdp, + }); err != nil { + logger.Debugf("set remote pub desc error: %v", err) + } +} + +func (p *Peer) handleICE(payload map[string]any) { + candidates, _ := payload["rtcIceCandidates"].([]any) + + for _, c := range candidates { + cand, _ := c.(map[string]any) + candStr, _ := cand["candidate"].(string) + target, _ := cand["target"].(string) + sdpMid, _ := cand["sdpMid"].(string) + sdpMLineIndex, _ := cand["sdpMLineIndex"].(float64) + + init := webrtc.ICECandidateInit{ + Candidate: candStr, + SDPMid: &sdpMid, + SDPMLineIndex: func() *uint16 { v := uint16(sdpMLineIndex); return &v }(), + } + + switch target { + case "SUBSCRIBER": + _ = p.pcSub.AddICECandidate(init) + case "PUBLISHER": + _ = p.pcPub.AddICECandidate(init) + } + } +} + +func (p *Peer) updateWSDeadline() { + p.wsMu.Lock() + if p.ws != nil { + _ = p.ws.SetReadDeadline(time.Now().Add(60 * time.Second)) + } + p.wsMu.Unlock() +} + +func (p *Peer) Send(data []byte) error { + if p.dc == nil || p.dc.ReadyState() != webrtc.DataChannelStateOpen { + return provider.ErrDataChannelNotReady + } + + if p.sendQueueClosed.Load() { + return provider.ErrSendQueueClosed + } + + select { + case p.sendQueue <- data: + return nil + case <-time.After(50 * time.Millisecond): + return provider.ErrSendQueueTimeout + } +} + +func (p *Peer) processSendQueue() { + for { + select { + case <-p.sessionCloseCh: + return + case <-p.closeCh: + return + case data := <-p.sendQueue: + if err := p.dc.Send(data); err != nil { + logger.Debugf("send error: %v", err) + p.queueReconnect() + return + } + time.Sleep(2 * time.Millisecond) + } + } +} + +func (p *Peer) Close() error { + p.closed.Store(true) + p.sendQueueClosed.Store(true) + + close(p.closeCh) + + done := make(chan struct{}) + go func() { + p.wg.Wait() + close(done) + }() + + select { + case <-done: + case <-time.After(2 * time.Second): + } + + if p.dc != nil { + _ = p.dc.Close() + } + if p.pcPub != nil { + _ = p.pcPub.Close() + } + if p.pcSub != nil { + _ = p.pcSub.Close() + } + if p.ws != nil { + p.wsMu.Lock() + _ = p.ws.WriteControl(websocket.CloseMessage, + websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), + time.Now().Add(time.Second)) + _ = p.ws.Close() + p.wsMu.Unlock() + } + + return nil +} + +func (p *Peer) SetReconnectCallback(cb func(*webrtc.DataChannel)) { + p.onReconnect = cb +} + +func (p *Peer) SetShouldReconnect(fn func() bool) { + p.shouldReconnect = fn +} + +func (p *Peer) SetEndedCallback(cb func(string)) { + p.onEnded = cb +} + +func (p *Peer) WatchConnection(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-p.closeCh: + return + case <-p.reconnectCh: + } + } +} + +func (p *Peer) CanSend() bool { + if p.dc == nil || p.dc.ReadyState() != webrtc.DataChannelStateOpen { + return false + } + return len(p.sendQueue) < 4000 +} + +func (p *Peer) GetSendQueue() chan []byte { + return p.sendQueue +} + +func (p *Peer) GetBufferedAmount() uint64 { + if p.dc != nil { + return p.dc.BufferedAmount() + } + return 0 +} + +func (p *Peer) queueReconnect() { + if p.closed.Load() || p.reconnecting.Load() { + return + } + if p.shouldReconnect != nil && !p.shouldReconnect() { + return + } + select { + case p.reconnectCh <- struct{}{}: + default: + } +} diff --git a/internal/provider/jazz/provider.go b/internal/provider/jazz/provider.go new file mode 100644 index 0000000..7c5022c --- /dev/null +++ b/internal/provider/jazz/provider.go @@ -0,0 +1,66 @@ +package jazz + +import ( + "context" + "fmt" + + "github.com/openlibrecommunity/olcrtc/internal/provider" + "github.com/pion/webrtc/v4" +) + +type jazzProvider struct { + peer *Peer +} + +func New(ctx context.Context, cfg provider.Config) (provider.Provider, error) { + peer, err := NewPeer(ctx, cfg.Name, cfg.OnData) + if err != nil { + return nil, fmt.Errorf("create jazz peer: %w", err) + } + + return &jazzProvider{peer: peer}, nil +} + +func (j *jazzProvider) Connect(ctx context.Context) error { + return j.peer.Connect(ctx) +} + +func (j *jazzProvider) Send(data []byte) error { + return j.peer.Send(data) +} + +func (j *jazzProvider) Close() error { + return j.peer.Close() +} + +func (j *jazzProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) { + j.peer.SetReconnectCallback(cb) +} + +func (j *jazzProvider) SetShouldReconnect(fn func() bool) { + j.peer.SetShouldReconnect(fn) +} + +func (j *jazzProvider) SetEndedCallback(cb func(string)) { + j.peer.SetEndedCallback(cb) +} + +func (j *jazzProvider) WatchConnection(ctx context.Context) { + j.peer.WatchConnection(ctx) +} + +func (j *jazzProvider) CanSend() bool { + return j.peer.CanSend() +} + +func (j *jazzProvider) GetSendQueue() chan []byte { + return j.peer.GetSendQueue() +} + +func (j *jazzProvider) GetBufferedAmount() uint64 { + return j.peer.GetBufferedAmount() +} + +func init() { + provider.Register("jazz", New) +} diff --git a/internal/provider/provider.go b/internal/provider/provider.go new file mode 100644 index 0000000..0f2ae7e --- /dev/null +++ b/internal/provider/provider.go @@ -0,0 +1,53 @@ +package provider + +import ( + "context" + + "github.com/pion/webrtc/v4" +) + +type Provider interface { + Connect(ctx context.Context) error + Send(data []byte) error + Close() error + SetReconnectCallback(cb func(*webrtc.DataChannel)) + SetShouldReconnect(fn func() bool) + SetEndedCallback(cb func(string)) + WatchConnection(ctx context.Context) + CanSend() bool + GetSendQueue() chan []byte + GetBufferedAmount() uint64 +} + +type Config struct { + RoomURL string + Name string + OnData func([]byte) + DNSServer string + ProxyAddr string + ProxyPort int +} + +type Factory func(ctx context.Context, cfg Config) (Provider, error) + +var providers = make(map[string]Factory) + +func Register(name string, factory Factory) { + providers[name] = factory +} + +func New(ctx context.Context, name string, cfg Config) (Provider, error) { + factory, ok := providers[name] + if !ok { + return nil, ErrProviderNotFound + } + return factory(ctx, cfg) +} + +func Available() []string { + names := make([]string, 0, len(providers)) + for name := range providers { + names = append(names, name) + } + return names +} diff --git a/internal/telemost/api.go b/internal/provider/telemost/api.go similarity index 100% rename from internal/telemost/api.go rename to internal/provider/telemost/api.go diff --git a/internal/telemost/peer.go b/internal/provider/telemost/peer.go similarity index 97% rename from internal/telemost/peer.go rename to internal/provider/telemost/peer.go index 8a25965..85ddd02 100644 --- a/internal/telemost/peer.go +++ b/internal/provider/telemost/peer.go @@ -29,18 +29,12 @@ const ( ) var ( - // ErrDataChannelTimeout is returned when the datachannel fails to open within the timeout. - ErrDataChannelTimeout = errors.New("datachannel timeout") - // ErrDataChannelNotReady is returned when the datachannel is not open. + ErrDataChannelTimeout = errors.New("datachannel timeout") ErrDataChannelNotReady = errors.New("datachannel not ready") - // ErrSendQueueClosed is returned when the send queue is closed. - ErrSendQueueClosed = errors.New("send queue closed") - // ErrSendQueueTimeout is returned when sending to the queue times out. - ErrSendQueueTimeout = errors.New("send queue timeout") - // ErrSessionClosed is returned when the session is closed. - ErrSessionClosed = errors.New("session closed") - // ErrPeerClosed is returned when the peer is closed. - ErrPeerClosed = errors.New("peer closed") + ErrSendQueueClosed = errors.New("send queue closed") + ErrSendQueueTimeout = errors.New("send queue timeout") + ErrSessionClosed = errors.New("session closed") + ErrPeerClosed = errors.New("peer closed") ) type TrafficShape struct { //nolint:revive diff --git a/internal/provider/telemost/provider.go b/internal/provider/telemost/provider.go new file mode 100644 index 0000000..e526cd9 --- /dev/null +++ b/internal/provider/telemost/provider.go @@ -0,0 +1,66 @@ +package telemost + +import ( + "context" + "fmt" + + "github.com/openlibrecommunity/olcrtc/internal/provider" + "github.com/pion/webrtc/v4" +) + +type telemostProvider struct { + peer *Peer +} + +func New(ctx context.Context, cfg provider.Config) (provider.Provider, error) { + peer, err := NewPeer(ctx, cfg.RoomURL, cfg.Name, cfg.OnData) + if err != nil { + return nil, fmt.Errorf("create telemost peer: %w", err) + } + + return &telemostProvider{peer: peer}, nil +} + +func (t *telemostProvider) Connect(ctx context.Context) error { + return t.peer.Connect(ctx) +} + +func (t *telemostProvider) Send(data []byte) error { + return t.peer.Send(data) +} + +func (t *telemostProvider) Close() error { + return t.peer.Close() +} + +func (t *telemostProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) { + t.peer.SetReconnectCallback(cb) +} + +func (t *telemostProvider) SetShouldReconnect(fn func() bool) { + t.peer.SetShouldReconnect(fn) +} + +func (t *telemostProvider) SetEndedCallback(cb func(string)) { + t.peer.SetEndedCallback(cb) +} + +func (t *telemostProvider) WatchConnection(ctx context.Context) { + t.peer.WatchConnection(ctx) +} + +func (t *telemostProvider) CanSend() bool { + return t.peer.CanSend() +} + +func (t *telemostProvider) GetSendQueue() chan []byte { + return t.peer.GetSendQueue() +} + +func (t *telemostProvider) GetBufferedAmount() uint64 { + return t.peer.GetBufferedAmount() +} + +func init() { + provider.Register("telemost", New) +} diff --git a/internal/server/server.go b/internal/server/server.go index a14c7f3..08058ed 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -20,7 +20,9 @@ import ( "github.com/openlibrecommunity/olcrtc/internal/logger" "github.com/openlibrecommunity/olcrtc/internal/mux" "github.com/openlibrecommunity/olcrtc/internal/names" - "github.com/openlibrecommunity/olcrtc/internal/telemost" + "github.com/openlibrecommunity/olcrtc/internal/provider" + _ "github.com/openlibrecommunity/olcrtc/internal/provider/jazz" + _ "github.com/openlibrecommunity/olcrtc/internal/provider/telemost" "github.com/pion/webrtc/v4" ) @@ -41,8 +43,8 @@ var ( ErrEncryptFailed = errors.New("encrypt failed") ) -type Server struct { //nolint:revive - peers []*telemost.Peer +type Server struct { + peers []provider.Provider cipher *crypto.Cipher mux *mux.Multiplexer connections map[uint16]net.Conn @@ -64,9 +66,9 @@ type ConnectRequest struct { //nolint:revive Port int `json:"port"` } -// Run starts the olcrtc server and listens for client connections. func Run( ctx context.Context, + providerName, roomURL, keyHex string, dnsServer, @@ -85,7 +87,7 @@ func Run( cipher: cipher, connections: make(map[uint16]net.Conn), streamPumps: make(map[uint16]net.Conn), - peers: make([]*telemost.Peer, 0), + peers: make([]provider.Provider, 0), dnsServer: dnsServer, socksProxyAddr: socksProxyAddr, socksProxyPort: socksProxyPort, @@ -100,7 +102,7 @@ func Run( const peerCount = 1 for i := range peerCount { - if err := s.addPeer(runCtx, roomURL, i, cancel); err != nil { + if err := s.addPeer(runCtx, providerName, roomURL, i, cancel); err != nil { return fmt.Errorf("addPeer failed: %w", err) } } @@ -182,8 +184,15 @@ func (s *Server) setupMux() { }) } -func (s *Server) addPeer(ctx context.Context, roomURL string, peerID int, cancel context.CancelFunc) error { - peer, err := telemost.NewPeer(ctx, roomURL, names.Generate(), s.onData) +func (s *Server) addPeer(ctx context.Context, providerName, roomURL string, peerID int, cancel context.CancelFunc) error { + peer, err := provider.New(ctx, providerName, provider.Config{ + RoomURL: roomURL, + Name: names.Generate(), + OnData: s.onData, + DNSServer: s.dnsServer, + ProxyAddr: s.socksProxyAddr, + ProxyPort: s.socksProxyPort, + }) if err != nil { return fmt.Errorf("failed to create peer: %w", err) } @@ -198,7 +207,7 @@ func (s *Server) addPeer(ctx context.Context, roomURL string, peerID int, cancel s.handlePeerReconnect(peerID, dc) }) - log.Printf("Connecting peer %d to Telemost...", peerID) + log.Printf("Connecting peer %d to %s...", peerID, providerName) if err := peer.Connect(ctx); err != nil { return fmt.Errorf("failed to connect peer: %w", err) }