feat(provider): abstract provider interface and add jazz support

This commit is contained in:
zarazaex69
2026-04-14 01:10:57 +03:00
parent eca3fcad93
commit 6e6265799a
11 changed files with 933 additions and 38 deletions

View File

@@ -16,6 +16,9 @@ import (
"github.com/openlibrecommunity/olcrtc/internal/client"
"github.com/openlibrecommunity/olcrtc/internal/logger"
"github.com/openlibrecommunity/olcrtc/internal/names"
"github.com/openlibrecommunity/olcrtc/internal/provider"
_ "github.com/openlibrecommunity/olcrtc/internal/provider/jazz"
_ "github.com/openlibrecommunity/olcrtc/internal/provider/telemost"
"github.com/openlibrecommunity/olcrtc/internal/server"
)
@@ -34,9 +37,10 @@ type config struct {
}
var (
errUnsupportedProvider = errors.New("only telemost provider supported")
errRoomIDRequired = errors.New("room ID required")
errModeRequired = errors.New("specify -mode srv or -mode cnc")
errProviderRequired = errors.New("provider required (use -provider telemost or -provider jazz)")
errUnsupportedProvider = errors.New("unsupported provider")
)
func main() {
@@ -86,8 +90,8 @@ func parseFlags() config {
cfg := config{}
flag.StringVar(&cfg.mode, "mode", "", "Mode: srv or cnc")
flag.StringVar(&cfg.roomID, "id", "", "Telemost room ID")
flag.StringVar(&cfg.provider, "provider", "telemost", "Provider (telemost only)")
flag.StringVar(&cfg.roomID, "id", "", "Room ID")
flag.StringVar(&cfg.provider, "provider", "", "Provider: telemost or jazz (required)")
flag.IntVar(&cfg.socksPort, "socks-port", 1080, "SOCKS5 port (client only)")
flag.StringVar(&cfg.socksHost, "socks-host", "127.0.0.1", "SOCKS5 listen host (client only)")
flag.StringVar(&cfg.keyHex, "key", "", "Shared encryption key (hex)")
@@ -112,9 +116,20 @@ func configureLogging(debug bool) {
}
func validateConfig(cfg config) error {
available := provider.Available()
validProvider := false
for _, p := range available {
if cfg.provider == p {
validProvider = true
break
}
}
switch {
case cfg.provider != "telemost":
return errUnsupportedProvider
case cfg.provider == "":
return errProviderRequired
case !validProvider:
return fmt.Errorf("%w: %s (available: %v)", errUnsupportedProvider, cfg.provider, available)
case cfg.roomID == "":
return errRoomIDRequired
case cfg.mode != "srv" && cfg.mode != "cnc":
@@ -148,12 +163,13 @@ func loadNames(dataDir string) error {
}
func runMode(ctx context.Context, cfg config, errCh chan<- error) {
roomURL := "https://telemost.yandex.ru/j/" + cfg.roomID
roomURL := buildRoomURL(cfg.provider, cfg.roomID)
switch cfg.mode {
case "srv":
errCh <- server.Run(
ctx,
cfg.provider,
roomURL,
cfg.keyHex,
cfg.dnsServer,
@@ -163,6 +179,7 @@ func runMode(ctx context.Context, cfg config, errCh chan<- error) {
case "cnc":
errCh <- client.Run(
ctx,
cfg.provider,
roomURL,
cfg.keyHex,
cfg.socksPort,
@@ -173,6 +190,17 @@ func runMode(ctx context.Context, cfg config, errCh chan<- error) {
}
}
func buildRoomURL(providerName, roomID string) string {
switch providerName {
case "telemost":
return "https://telemost.yandex.ru/j/" + roomID
case "jazz":
return roomID
default:
return roomID
}
}
func waitForShutdown(errCh <-chan error) error {
done := make(chan error, 1)
go func() {

View File

@@ -21,7 +21,9 @@ import (
"github.com/openlibrecommunity/olcrtc/internal/logger"
"github.com/openlibrecommunity/olcrtc/internal/mux"
"github.com/openlibrecommunity/olcrtc/internal/names"
"github.com/openlibrecommunity/olcrtc/internal/telemost"
"github.com/openlibrecommunity/olcrtc/internal/provider"
_ "github.com/openlibrecommunity/olcrtc/internal/provider/jazz"
_ "github.com/openlibrecommunity/olcrtc/internal/provider/telemost"
"github.com/pion/webrtc/v4"
)
@@ -31,9 +33,8 @@ var (
errNoConnectedPeers = errors.New("no connected peers available")
)
// Client manages the client-side mux and SOCKS5 listener.
type Client struct {
peers []*telemost.Peer
peers []provider.Provider
cipher *crypto.Cipher
mux *mux.Multiplexer
clientID uint32
@@ -43,9 +44,9 @@ type Client struct {
const defaultSOCKSListenHost = "127.0.0.1"
// Run starts the client and listens for SOCKS5 traffic.
func Run(
ctx context.Context,
providerName,
roomURL,
keyHex string,
socksPort int,
@@ -53,12 +54,12 @@ func Run(
socksUser,
socksPass string,
) error {
return RunWithReady(ctx, roomURL, keyHex, socksPort, socksHost, socksUser, socksPass, nil)
return RunWithReady(ctx, providerName, roomURL, keyHex, socksPort, socksHost, socksUser, socksPass, nil)
}
// RunWithReady starts the client and invokes onReady once the local SOCKS5 listener is accepting connections.
func RunWithReady(
ctx context.Context,
providerName,
roomURL,
keyHex string,
socksPort int,
@@ -88,13 +89,13 @@ func RunWithReady(
c := &Client{
cipher: cipher,
clientID: uint32(time.Now().UnixNano() & 0xFFFFFFFF),
peers: make([]*telemost.Peer, 0, 1),
peers: make([]provider.Provider, 0, 1),
}
c.mux = mux.New(c.clientID, c.sendFrame)
for peerID := range 1 {
if err := c.addPeer(runCtx, roomURL, peerID, cancel); err != nil {
if err := c.addPeer(runCtx, providerName, roomURL, peerID, cancel); err != nil {
return fmt.Errorf("addPeer failed: %w", err)
}
}
@@ -152,7 +153,7 @@ func (c *Client) sendFrame(frame []byte) error {
return nil
}
func waitUntilPeersCanSend(peers []*telemost.Peer) {
func waitUntilPeersCanSend(peers []provider.Provider) {
for {
canSend := true
for _, peer := range peers {
@@ -170,7 +171,7 @@ func waitUntilPeersCanSend(peers []*telemost.Peer) {
}
}
func (c *Client) nextPeer() (*telemost.Peer, error) {
func (c *Client) nextPeer() (provider.Provider, error) {
switch len(c.peers) {
case 0:
return nil, errNoConnectedPeers
@@ -183,11 +184,16 @@ func (c *Client) nextPeer() (*telemost.Peer, error) {
func (c *Client) addPeer(
runCtx context.Context,
providerName,
roomURL string,
peerID int,
cancel context.CancelFunc,
) error {
peer, err := telemost.NewPeer(runCtx, roomURL, names.Generate(), c.onData)
peer, err := provider.New(runCtx, providerName, provider.Config{
RoomURL: roomURL,
Name: names.Generate(),
OnData: c.onData,
})
if err != nil {
return fmt.Errorf("create peer %d: %w", peerID, err)
}
@@ -203,7 +209,7 @@ func (c *Client) addPeer(
c.peers = append(c.peers, peer)
log.Printf("Connecting peer %d to Telemost...", peerID)
log.Printf("Connecting peer %d to %s...", peerID, providerName)
if err := peer.Connect(runCtx); err != nil {
return fmt.Errorf("connect peer %d: %w", peerID, err)
}

View File

@@ -0,0 +1,13 @@
package provider
import "errors"
var (
ErrProviderNotFound = errors.New("provider not found")
ErrDataChannelTimeout = errors.New("datachannel timeout")
ErrDataChannelNotReady = errors.New("datachannel not ready")
ErrSendQueueClosed = errors.New("send queue closed")
ErrSendQueueTimeout = errors.New("send queue timeout")
ErrSessionClosed = errors.New("session closed")
ErrPeerClosed = errors.New("peer closed")
)

View File

@@ -0,0 +1,132 @@
package jazz
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"github.com/google/uuid"
"github.com/openlibrecommunity/olcrtc/internal/protect"
)
const apiBase = "https://bk.salutejazz.ru"
type RoomInfo struct {
RoomID string `json:"roomId"`
Password string `json:"password"`
ConnectorURL string `json:"connectorUrl"`
}
func createRoom(ctx context.Context) (*RoomInfo, error) {
clientID := uuid.New().String()
headers := map[string]string{
"X-Jazz-ClientId": clientID,
"X-Jazz-AuthType": "ANONYMOUS",
"X-Client-AuthType": "ANONYMOUS",
"Content-Type": "application/json",
}
createPayload := map[string]any{
"title": "olcrtc",
"guestEnabled": true,
"lobbyEnabled": false,
"serverVideoRecordAutoStartEnabled": false,
"sipEnabled": false,
"moderatorEmails": []string{},
"summarizationEnabled": false,
"room3dEnabled": false,
"room3dScene": "XRLobby",
}
body, err := json.Marshal(createPayload)
if err != nil {
return nil, fmt.Errorf("marshal create payload: %w", err)
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, apiBase+"/room/create-meeting",
bytes.NewReader(body))
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
for k, v := range headers {
req.Header.Set(k, v)
}
client := protect.NewHTTPClient()
resp, err := client.Do(req)
if err != nil {
return nil, fmt.Errorf("do create request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("create room failed: status %d", resp.StatusCode)
}
var createResp struct {
RoomID string `json:"roomId"`
Password string `json:"password"`
}
if err := json.NewDecoder(resp.Body).Decode(&createResp); err != nil {
return nil, fmt.Errorf("decode create response: %w", err)
}
preconnectPayload := map[string]any{
"password": createResp.Password,
"jazzNextMigration": map[string]any{
"b2bBaseRoomSupport": true,
"demoRoomBaseSupport": true,
"demoRoomVersionSupport": 2,
"mediaWithoutAutoSubscribeSupport": true,
"webinarSpeakerSupport": true,
"webinarViewerSupport": true,
"sdkRoomSupport": true,
"sberclassRoomSupport": true,
},
}
preBody, err := json.Marshal(preconnectPayload)
if err != nil {
return nil, fmt.Errorf("marshal preconnect payload: %w", err)
}
preReq, err := http.NewRequestWithContext(
ctx,
http.MethodPost,
fmt.Sprintf("%s/room/%s/preconnect", apiBase, createResp.RoomID),
bytes.NewReader(preBody),
)
if err != nil {
return nil, fmt.Errorf("create preconnect request: %w", err)
}
for k, v := range headers {
preReq.Header.Set(k, v)
}
preResp, err := client.Do(preReq)
if err != nil {
return nil, fmt.Errorf("do preconnect request: %w", err)
}
defer preResp.Body.Close()
if preResp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("preconnect failed: status %d", preResp.StatusCode)
}
var preconnectResp struct {
ConnectorURL string `json:"connectorUrl"`
}
if err := json.NewDecoder(preResp.Body).Decode(&preconnectResp); err != nil {
return nil, fmt.Errorf("decode preconnect response: %w", err)
}
return &RoomInfo{
RoomID: createResp.RoomID,
Password: createResp.Password,
ConnectorURL: preconnectResp.ConnectorURL,
}, nil
}

View File

@@ -0,0 +1,528 @@
package jazz
import (
"context"
"fmt"
"log"
"sync"
"sync/atomic"
"time"
"github.com/google/uuid"
"github.com/gorilla/websocket"
"github.com/openlibrecommunity/olcrtc/internal/logger"
"github.com/openlibrecommunity/olcrtc/internal/protect"
"github.com/openlibrecommunity/olcrtc/internal/provider"
"github.com/pion/webrtc/v4"
)
type Peer struct {
name string
roomInfo *RoomInfo
ws *websocket.Conn
wsMu sync.Mutex
pcSub *webrtc.PeerConnection
pcPub *webrtc.PeerConnection
dc *webrtc.DataChannel
onData func([]byte)
onReconnect func(*webrtc.DataChannel)
shouldReconnect func() bool
reconnectCh chan struct{}
closeCh chan struct{}
closed atomic.Bool
reconnecting atomic.Bool
sendQueue chan []byte
sendQueueClosed atomic.Bool
onEnded func(string)
sessionCloseCh chan struct{}
sessionMu sync.Mutex
wg sync.WaitGroup
groupID string
}
func NewPeer(ctx context.Context, name string, onData func([]byte)) (*Peer, error) {
roomInfo, err := createRoom(ctx)
if err != nil {
return nil, fmt.Errorf("create room: %w", err)
}
log.Printf("Jazz room created: %s (password: %s)", roomInfo.RoomID, roomInfo.Password)
return &Peer{
name: name,
roomInfo: roomInfo,
onData: onData,
reconnectCh: make(chan struct{}, 1),
closeCh: make(chan struct{}),
sessionCloseCh: make(chan struct{}),
sendQueue: make(chan []byte, 5000),
}, nil
}
func (p *Peer) Connect(ctx context.Context) error {
p.closed.Store(false)
config := webrtc.Configuration{
ICEServers: []webrtc.ICEServer{},
SDPSemantics: webrtc.SDPSemanticsUnifiedPlan,
BundlePolicy: webrtc.BundlePolicyMaxBundle,
}
settingEngine := webrtc.SettingEngine{}
if protect.Protector != nil {
settingEngine.SetICEProxyDialer(protect.NewProxyDialer())
}
api := webrtc.NewAPI(webrtc.WithSettingEngine(settingEngine))
var err error
p.pcSub, err = api.NewPeerConnection(config)
if err != nil {
return fmt.Errorf("create subscriber pc: %w", err)
}
p.pcPub, err = api.NewPeerConnection(config)
if err != nil {
return fmt.Errorf("create publisher pc: %w", err)
}
p.dc, err = p.pcPub.CreateDataChannel("_reliable", &webrtc.DataChannelInit{
Ordered: func() *bool { v := true; return &v }(),
})
if err != nil {
return fmt.Errorf("create datachannel: %w", err)
}
dcReady := make(chan struct{})
p.setupDataChannelHandlers(dcReady)
if err := p.dialWebSocket(); err != nil {
return err
}
if err := p.sendJoin(); err != nil {
return err
}
p.wg.Add(1)
go func() {
defer p.wg.Done()
p.handleSignaling(ctx)
}()
select {
case <-dcReady:
return nil
case <-time.After(30 * time.Second):
return provider.ErrDataChannelTimeout
case <-ctx.Done():
return fmt.Errorf("connect cancelled: %w", ctx.Err())
}
}
func (p *Peer) dialWebSocket() error {
wsDialer := websocket.Dialer{
NetDialContext: protect.DialContext,
HandshakeTimeout: 15 * time.Second,
}
ws, resp, err := wsDialer.Dial(p.roomInfo.ConnectorURL, nil)
if err != nil {
return fmt.Errorf("dial websocket: %w", err)
}
if resp != nil && resp.Body != nil {
_ = resp.Body.Close()
}
p.ws = ws
ws.SetPongHandler(func(string) error {
_ = ws.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
_ = ws.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
}
func (p *Peer) sendJoin() error {
joinMsg := map[string]any{
"roomId": p.roomInfo.RoomID,
"event": "join",
"requestId": uuid.New().String(),
"payload": map[string]any{
"password": p.roomInfo.Password,
"participantName": p.name,
"supportedFeatures": map[string]any{
"attachedRooms": true,
"sessionGroups": true,
"transcription": true,
},
"isSilent": false,
},
}
p.wsMu.Lock()
defer p.wsMu.Unlock()
return p.ws.WriteJSON(joinMsg)
}
func (p *Peer) setupDataChannelHandlers(dcReady chan struct{}) {
p.dc.OnOpen(func() {
p.wg.Add(1)
go func() {
defer p.wg.Done()
p.processSendQueue()
}()
close(dcReady)
})
p.dc.OnClose(func() {
if !p.closed.Load() {
p.queueReconnect()
}
})
p.dc.OnMessage(func(msg webrtc.DataChannelMessage) {
if p.onData != nil && len(msg.Data) > 0 {
p.onData(msg.Data)
}
})
p.pcSub.OnDataChannel(func(dc *webrtc.DataChannel) {
dc.OnMessage(func(msg webrtc.DataChannelMessage) {
if p.onData != nil && len(msg.Data) > 0 {
p.onData(msg.Data)
}
})
})
}
func (p *Peer) handleSignaling(ctx context.Context) {
for {
var msg map[string]any
if err := p.ws.ReadJSON(&msg); err != nil {
logger.Debugf("ws read error: %v", err)
if !p.closed.Load() {
p.queueReconnect()
}
return
}
p.updateWSDeadline()
event, _ := msg["event"].(string)
payload, _ := msg["payload"].(map[string]any)
switch event {
case "join-response":
p.handleJoinResponse(payload)
case "media-out":
p.handleMediaOut(ctx, payload)
}
}
}
func (p *Peer) handleJoinResponse(payload map[string]any) {
group, _ := payload["participantGroup"].(map[string]any)
p.groupID, _ = group["groupId"].(string)
logger.Verbosef("Jazz peer joined: groupId=%s", p.groupID)
}
func (p *Peer) handleMediaOut(ctx context.Context, payload map[string]any) {
method, _ := payload["method"].(string)
switch method {
case "rtc:config":
p.handleRTCConfig(payload)
case "rtc:join":
logger.Verbosef("Jazz rtc:join received")
case "rtc:offer":
p.handleSubscriberOffer(ctx, payload)
case "rtc:answer":
p.handlePublisherAnswer(payload)
case "rtc:ice":
p.handleICE(payload)
}
}
func (p *Peer) handleRTCConfig(payload map[string]any) {
config, _ := payload["configuration"].(map[string]any)
servers, _ := config["iceServers"].([]any)
var iceServers []webrtc.ICEServer
for _, s := range servers {
server, _ := s.(map[string]any)
urls, _ := server["urls"].([]any)
username, _ := server["username"].(string)
credential, _ := server["credential"].(string)
var urlStrs []string
for _, u := range urls {
if urlStr, ok := u.(string); ok && urlStr != "" {
urlStrs = append(urlStrs, urlStr)
}
}
if len(urlStrs) > 0 {
iceServers = append(iceServers, webrtc.ICEServer{
URLs: urlStrs,
Username: username,
Credential: credential,
})
}
}
if len(iceServers) > 0 {
newConfig := webrtc.Configuration{
ICEServers: iceServers,
SDPSemantics: webrtc.SDPSemanticsUnifiedPlan,
BundlePolicy: webrtc.BundlePolicyMaxBundle,
}
_ = p.pcSub.SetConfiguration(newConfig)
_ = p.pcPub.SetConfiguration(newConfig)
}
}
func (p *Peer) handleSubscriberOffer(ctx context.Context, payload map[string]any) {
desc, _ := payload["description"].(map[string]any)
sdp, _ := desc["sdp"].(string)
if err := p.pcSub.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeOffer,
SDP: sdp,
}); err != nil {
logger.Debugf("set remote desc error: %v", err)
return
}
answer, err := p.pcSub.CreateAnswer(nil)
if err != nil {
logger.Debugf("create answer error: %v", err)
return
}
if err := p.pcSub.SetLocalDescription(answer); err != nil {
logger.Debugf("set local desc error: %v", err)
return
}
p.wsMu.Lock()
_ = p.ws.WriteJSON(map[string]any{
"roomId": p.roomInfo.RoomID,
"event": "media-in",
"groupId": p.groupID,
"requestId": uuid.New().String(),
"payload": map[string]any{
"method": "rtc:answer",
"description": map[string]any{
"type": "answer",
"sdp": answer.SDP,
},
},
})
p.wsMu.Unlock()
time.Sleep(300 * time.Millisecond)
p.sendPublisherOffer()
}
func (p *Peer) sendPublisherOffer() {
offer, err := p.pcPub.CreateOffer(nil)
if err != nil {
logger.Debugf("create pub offer error: %v", err)
return
}
if err := p.pcPub.SetLocalDescription(offer); err != nil {
logger.Debugf("set local pub desc error: %v", err)
return
}
p.wsMu.Lock()
_ = p.ws.WriteJSON(map[string]any{
"roomId": p.roomInfo.RoomID,
"event": "media-in",
"groupId": p.groupID,
"requestId": uuid.New().String(),
"payload": map[string]any{
"method": "rtc:offer",
"description": map[string]any{
"type": "offer",
"sdp": offer.SDP,
},
},
})
p.wsMu.Unlock()
}
func (p *Peer) handlePublisherAnswer(payload map[string]any) {
desc, _ := payload["description"].(map[string]any)
sdp, _ := desc["sdp"].(string)
if err := p.pcPub.SetRemoteDescription(webrtc.SessionDescription{
Type: webrtc.SDPTypeAnswer,
SDP: sdp,
}); err != nil {
logger.Debugf("set remote pub desc error: %v", err)
}
}
func (p *Peer) handleICE(payload map[string]any) {
candidates, _ := payload["rtcIceCandidates"].([]any)
for _, c := range candidates {
cand, _ := c.(map[string]any)
candStr, _ := cand["candidate"].(string)
target, _ := cand["target"].(string)
sdpMid, _ := cand["sdpMid"].(string)
sdpMLineIndex, _ := cand["sdpMLineIndex"].(float64)
init := webrtc.ICECandidateInit{
Candidate: candStr,
SDPMid: &sdpMid,
SDPMLineIndex: func() *uint16 { v := uint16(sdpMLineIndex); return &v }(),
}
switch target {
case "SUBSCRIBER":
_ = p.pcSub.AddICECandidate(init)
case "PUBLISHER":
_ = p.pcPub.AddICECandidate(init)
}
}
}
func (p *Peer) updateWSDeadline() {
p.wsMu.Lock()
if p.ws != nil {
_ = p.ws.SetReadDeadline(time.Now().Add(60 * time.Second))
}
p.wsMu.Unlock()
}
func (p *Peer) Send(data []byte) error {
if p.dc == nil || p.dc.ReadyState() != webrtc.DataChannelStateOpen {
return provider.ErrDataChannelNotReady
}
if p.sendQueueClosed.Load() {
return provider.ErrSendQueueClosed
}
select {
case p.sendQueue <- data:
return nil
case <-time.After(50 * time.Millisecond):
return provider.ErrSendQueueTimeout
}
}
func (p *Peer) processSendQueue() {
for {
select {
case <-p.sessionCloseCh:
return
case <-p.closeCh:
return
case data := <-p.sendQueue:
if err := p.dc.Send(data); err != nil {
logger.Debugf("send error: %v", err)
p.queueReconnect()
return
}
time.Sleep(2 * time.Millisecond)
}
}
}
func (p *Peer) Close() error {
p.closed.Store(true)
p.sendQueueClosed.Store(true)
close(p.closeCh)
done := make(chan struct{})
go func() {
p.wg.Wait()
close(done)
}()
select {
case <-done:
case <-time.After(2 * time.Second):
}
if p.dc != nil {
_ = p.dc.Close()
}
if p.pcPub != nil {
_ = p.pcPub.Close()
}
if p.pcSub != nil {
_ = p.pcSub.Close()
}
if p.ws != nil {
p.wsMu.Lock()
_ = p.ws.WriteControl(websocket.CloseMessage,
websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""),
time.Now().Add(time.Second))
_ = p.ws.Close()
p.wsMu.Unlock()
}
return nil
}
func (p *Peer) SetReconnectCallback(cb func(*webrtc.DataChannel)) {
p.onReconnect = cb
}
func (p *Peer) SetShouldReconnect(fn func() bool) {
p.shouldReconnect = fn
}
func (p *Peer) SetEndedCallback(cb func(string)) {
p.onEnded = cb
}
func (p *Peer) WatchConnection(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-p.closeCh:
return
case <-p.reconnectCh:
}
}
}
func (p *Peer) CanSend() bool {
if p.dc == nil || p.dc.ReadyState() != webrtc.DataChannelStateOpen {
return false
}
return len(p.sendQueue) < 4000
}
func (p *Peer) GetSendQueue() chan []byte {
return p.sendQueue
}
func (p *Peer) GetBufferedAmount() uint64 {
if p.dc != nil {
return p.dc.BufferedAmount()
}
return 0
}
func (p *Peer) queueReconnect() {
if p.closed.Load() || p.reconnecting.Load() {
return
}
if p.shouldReconnect != nil && !p.shouldReconnect() {
return
}
select {
case p.reconnectCh <- struct{}{}:
default:
}
}

View File

@@ -0,0 +1,66 @@
package jazz
import (
"context"
"fmt"
"github.com/openlibrecommunity/olcrtc/internal/provider"
"github.com/pion/webrtc/v4"
)
type jazzProvider struct {
peer *Peer
}
func New(ctx context.Context, cfg provider.Config) (provider.Provider, error) {
peer, err := NewPeer(ctx, cfg.Name, cfg.OnData)
if err != nil {
return nil, fmt.Errorf("create jazz peer: %w", err)
}
return &jazzProvider{peer: peer}, nil
}
func (j *jazzProvider) Connect(ctx context.Context) error {
return j.peer.Connect(ctx)
}
func (j *jazzProvider) Send(data []byte) error {
return j.peer.Send(data)
}
func (j *jazzProvider) Close() error {
return j.peer.Close()
}
func (j *jazzProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) {
j.peer.SetReconnectCallback(cb)
}
func (j *jazzProvider) SetShouldReconnect(fn func() bool) {
j.peer.SetShouldReconnect(fn)
}
func (j *jazzProvider) SetEndedCallback(cb func(string)) {
j.peer.SetEndedCallback(cb)
}
func (j *jazzProvider) WatchConnection(ctx context.Context) {
j.peer.WatchConnection(ctx)
}
func (j *jazzProvider) CanSend() bool {
return j.peer.CanSend()
}
func (j *jazzProvider) GetSendQueue() chan []byte {
return j.peer.GetSendQueue()
}
func (j *jazzProvider) GetBufferedAmount() uint64 {
return j.peer.GetBufferedAmount()
}
func init() {
provider.Register("jazz", New)
}

View File

@@ -0,0 +1,53 @@
package provider
import (
"context"
"github.com/pion/webrtc/v4"
)
type Provider interface {
Connect(ctx context.Context) error
Send(data []byte) error
Close() error
SetReconnectCallback(cb func(*webrtc.DataChannel))
SetShouldReconnect(fn func() bool)
SetEndedCallback(cb func(string))
WatchConnection(ctx context.Context)
CanSend() bool
GetSendQueue() chan []byte
GetBufferedAmount() uint64
}
type Config struct {
RoomURL string
Name string
OnData func([]byte)
DNSServer string
ProxyAddr string
ProxyPort int
}
type Factory func(ctx context.Context, cfg Config) (Provider, error)
var providers = make(map[string]Factory)
func Register(name string, factory Factory) {
providers[name] = factory
}
func New(ctx context.Context, name string, cfg Config) (Provider, error) {
factory, ok := providers[name]
if !ok {
return nil, ErrProviderNotFound
}
return factory(ctx, cfg)
}
func Available() []string {
names := make([]string, 0, len(providers))
for name := range providers {
names = append(names, name)
}
return names
}

View File

@@ -29,18 +29,12 @@ const (
)
var (
// ErrDataChannelTimeout is returned when the datachannel fails to open within the timeout.
ErrDataChannelTimeout = errors.New("datachannel timeout")
// ErrDataChannelNotReady is returned when the datachannel is not open.
ErrDataChannelTimeout = errors.New("datachannel timeout")
ErrDataChannelNotReady = errors.New("datachannel not ready")
// ErrSendQueueClosed is returned when the send queue is closed.
ErrSendQueueClosed = errors.New("send queue closed")
// ErrSendQueueTimeout is returned when sending to the queue times out.
ErrSendQueueTimeout = errors.New("send queue timeout")
// ErrSessionClosed is returned when the session is closed.
ErrSessionClosed = errors.New("session closed")
// ErrPeerClosed is returned when the peer is closed.
ErrPeerClosed = errors.New("peer closed")
ErrSendQueueClosed = errors.New("send queue closed")
ErrSendQueueTimeout = errors.New("send queue timeout")
ErrSessionClosed = errors.New("session closed")
ErrPeerClosed = errors.New("peer closed")
)
type TrafficShape struct { //nolint:revive

View File

@@ -0,0 +1,66 @@
package telemost
import (
"context"
"fmt"
"github.com/openlibrecommunity/olcrtc/internal/provider"
"github.com/pion/webrtc/v4"
)
type telemostProvider struct {
peer *Peer
}
func New(ctx context.Context, cfg provider.Config) (provider.Provider, error) {
peer, err := NewPeer(ctx, cfg.RoomURL, cfg.Name, cfg.OnData)
if err != nil {
return nil, fmt.Errorf("create telemost peer: %w", err)
}
return &telemostProvider{peer: peer}, nil
}
func (t *telemostProvider) Connect(ctx context.Context) error {
return t.peer.Connect(ctx)
}
func (t *telemostProvider) Send(data []byte) error {
return t.peer.Send(data)
}
func (t *telemostProvider) Close() error {
return t.peer.Close()
}
func (t *telemostProvider) SetReconnectCallback(cb func(*webrtc.DataChannel)) {
t.peer.SetReconnectCallback(cb)
}
func (t *telemostProvider) SetShouldReconnect(fn func() bool) {
t.peer.SetShouldReconnect(fn)
}
func (t *telemostProvider) SetEndedCallback(cb func(string)) {
t.peer.SetEndedCallback(cb)
}
func (t *telemostProvider) WatchConnection(ctx context.Context) {
t.peer.WatchConnection(ctx)
}
func (t *telemostProvider) CanSend() bool {
return t.peer.CanSend()
}
func (t *telemostProvider) GetSendQueue() chan []byte {
return t.peer.GetSendQueue()
}
func (t *telemostProvider) GetBufferedAmount() uint64 {
return t.peer.GetBufferedAmount()
}
func init() {
provider.Register("telemost", New)
}

View File

@@ -20,7 +20,9 @@ import (
"github.com/openlibrecommunity/olcrtc/internal/logger"
"github.com/openlibrecommunity/olcrtc/internal/mux"
"github.com/openlibrecommunity/olcrtc/internal/names"
"github.com/openlibrecommunity/olcrtc/internal/telemost"
"github.com/openlibrecommunity/olcrtc/internal/provider"
_ "github.com/openlibrecommunity/olcrtc/internal/provider/jazz"
_ "github.com/openlibrecommunity/olcrtc/internal/provider/telemost"
"github.com/pion/webrtc/v4"
)
@@ -41,8 +43,8 @@ var (
ErrEncryptFailed = errors.New("encrypt failed")
)
type Server struct { //nolint:revive
peers []*telemost.Peer
type Server struct {
peers []provider.Provider
cipher *crypto.Cipher
mux *mux.Multiplexer
connections map[uint16]net.Conn
@@ -64,9 +66,9 @@ type ConnectRequest struct { //nolint:revive
Port int `json:"port"`
}
// Run starts the olcrtc server and listens for client connections.
func Run(
ctx context.Context,
providerName,
roomURL,
keyHex string,
dnsServer,
@@ -85,7 +87,7 @@ func Run(
cipher: cipher,
connections: make(map[uint16]net.Conn),
streamPumps: make(map[uint16]net.Conn),
peers: make([]*telemost.Peer, 0),
peers: make([]provider.Provider, 0),
dnsServer: dnsServer,
socksProxyAddr: socksProxyAddr,
socksProxyPort: socksProxyPort,
@@ -100,7 +102,7 @@ func Run(
const peerCount = 1
for i := range peerCount {
if err := s.addPeer(runCtx, roomURL, i, cancel); err != nil {
if err := s.addPeer(runCtx, providerName, roomURL, i, cancel); err != nil {
return fmt.Errorf("addPeer failed: %w", err)
}
}
@@ -182,8 +184,15 @@ func (s *Server) setupMux() {
})
}
func (s *Server) addPeer(ctx context.Context, roomURL string, peerID int, cancel context.CancelFunc) error {
peer, err := telemost.NewPeer(ctx, roomURL, names.Generate(), s.onData)
func (s *Server) addPeer(ctx context.Context, providerName, roomURL string, peerID int, cancel context.CancelFunc) error {
peer, err := provider.New(ctx, providerName, provider.Config{
RoomURL: roomURL,
Name: names.Generate(),
OnData: s.onData,
DNSServer: s.dnsServer,
ProxyAddr: s.socksProxyAddr,
ProxyPort: s.socksProxyPort,
})
if err != nil {
return fmt.Errorf("failed to create peer: %w", err)
}
@@ -198,7 +207,7 @@ func (s *Server) addPeer(ctx context.Context, roomURL string, peerID int, cancel
s.handlePeerReconnect(peerID, dc)
})
log.Printf("Connecting peer %d to Telemost...", peerID)
log.Printf("Connecting peer %d to %s...", peerID, providerName)
if err := peer.Connect(ctx); err != nil {
return fmt.Errorf("failed to connect peer: %w", err)
}