package telemost import ( "context" "fmt" "log" "strings" "time" "github.com/google/uuid" "github.com/gorilla/websocket" "github.com/pion/webrtc/v4" ) type Peer struct { roomURL string name string conn *ConnectionInfo ws *websocket.Conn pcSub *webrtc.PeerConnection pcPub *webrtc.PeerConnection dc *webrtc.DataChannel onData func([]byte) reconnectCh chan struct{} closeCh chan struct{} } func NewPeer(roomURL, name string, onData func([]byte)) (*Peer, error) { conn, err := GetConnectionInfo(roomURL, name) if err != nil { return nil, err } return &Peer{ roomURL: roomURL, name: name, conn: conn, onData: onData, reconnectCh: make(chan struct{}, 1), closeCh: make(chan struct{}), }, nil } func (p *Peer) Connect(ctx context.Context) error { config := webrtc.Configuration{ ICEServers: []webrtc.ICEServer{ {URLs: []string{"stun:stun.rtc.yandex.net:3478"}}, }, SDPSemantics: webrtc.SDPSemanticsUnifiedPlan, } settingEngine := webrtc.SettingEngine{} api := webrtc.NewAPI(webrtc.WithSettingEngine(settingEngine)) var err error p.pcSub, err = api.NewPeerConnection(config) if err != nil { return err } p.pcPub, err = api.NewPeerConnection(config) if err != nil { return err } p.dc, err = p.pcPub.CreateDataChannel("olcrtc", nil) if err != nil { return err } dcReady := make(chan struct{}) p.dc.OnOpen(func() { close(dcReady) }) p.dc.OnMessage(func(msg webrtc.DataChannelMessage) { if p.onData != nil && len(msg.Data) > 0 { p.onData(msg.Data) } }) p.pcSub.OnDataChannel(func(dc *webrtc.DataChannel) { log.Printf("Received datachannel: %s", dc.Label()) dc.OnMessage(func(msg webrtc.DataChannelMessage) { if p.onData != nil && len(msg.Data) > 0 { p.onData(msg.Data) } }) }) ws, _, err := websocket.DefaultDialer.Dial(p.conn.ClientConfig.MediaServerURL, nil) if err != nil { return err } p.ws = ws ws.SetPongHandler(func(string) error { ws.SetReadDeadline(time.Now().Add(60 * time.Second)) return nil }) go p.keepAlive() if err := p.sendHello(); err != nil { return err } p.setupICEHandlers() go p.handleSignaling() select { case <-dcReady: return nil case <-time.After(15 * time.Second): return fmt.Errorf("datachannel timeout") case <-ctx.Done(): return ctx.Err() } } func (p *Peer) Send(data []byte) error { return p.dc.Send(data) } func (p *Peer) sendHello() error { hello := map[string]interface{}{ "uid": uuid.New().String(), "hello": map[string]interface{}{ "participantMeta": map[string]interface{}{ "name": p.name, "role": "SPEAKER", "sendAudio": false, "sendVideo": false, }, "participantAttributes": map[string]interface{}{ "name": p.name, "role": "SPEAKER", }, "sendAudio": false, "sendVideo": false, "sendSharing": false, "participantId": p.conn.PeerID, "roomId": p.conn.RoomID, "serviceName": "telemost", "credentials": p.conn.Credentials, "capabilitiesOffer": map[string]interface{}{ "offerAnswerMode": []string{"SEPARATE"}, "initialSubscriberOffer": []string{"ON_HELLO"}, "slotsMode": []string{"FROM_CONTROLLER"}, "simulcastMode": []string{"DISABLED"}, "selfVadStatus": []string{"FROM_SERVER"}, "dataChannelSharing": []string{"TO_RTP"}, }, "sdkInfo": map[string]interface{}{ "implementation": "go", "version": "1.0.0", "userAgent": "OlcRTC-" + p.name, }, "sdkInitializationId": uuid.New().String(), "disablePublisher": false, "disableSubscriber": false, }, } return p.ws.WriteJSON(hello) } func (p *Peer) handleSignaling() { pubSent := false for { var msg map[string]interface{} if err := p.ws.ReadJSON(&msg); err != nil { log.Printf("WS read error: %v", err) select { case p.reconnectCh <- struct{}{}: default: } return } uid, _ := msg["uid"].(string) if _, ok := msg["serverHello"]; ok { p.sendAck(uid) } if offer, ok := msg["subscriberSdpOffer"].(map[string]interface{}); ok && !pubSent { sdp, _ := offer["sdp"].(string) pcSeq, _ := offer["pcSeq"].(float64) if err := p.pcSub.SetRemoteDescription(webrtc.SessionDescription{ Type: webrtc.SDPTypeOffer, SDP: sdp, }); err != nil { log.Printf("SetRemoteDescription error: %v", err) continue } answer, err := p.pcSub.CreateAnswer(nil) if err != nil { log.Printf("CreateAnswer error: %v", err) continue } if err := p.pcSub.SetLocalDescription(answer); err != nil { log.Printf("SetLocalDescription error: %v", err) continue } p.ws.WriteJSON(map[string]interface{}{ "uid": uuid.New().String(), "subscriberSdpAnswer": map[string]interface{}{ "pcSeq": int(pcSeq), "sdp": answer.SDP, }, }) p.sendAck(uid) time.Sleep(300 * time.Millisecond) pubOffer, err := p.pcPub.CreateOffer(nil) if err != nil { log.Printf("CreateOffer error: %v", err) continue } if err := p.pcPub.SetLocalDescription(pubOffer); err != nil { log.Printf("SetLocalDescription error: %v", err) continue } p.ws.WriteJSON(map[string]interface{}{ "uid": uuid.New().String(), "publisherSdpOffer": map[string]interface{}{ "pcSeq": 1, "sdp": pubOffer.SDP, }, }) pubSent = true } if answer, ok := msg["publisherSdpAnswer"].(map[string]interface{}); ok { sdp, _ := answer["sdp"].(string) if err := p.pcPub.SetRemoteDescription(webrtc.SessionDescription{ Type: webrtc.SDPTypeAnswer, SDP: sdp, }); err != nil { log.Printf("SetRemoteDescription error: %v", err) } p.sendAck(uid) } if cand, ok := msg["webrtcIceCandidate"].(map[string]interface{}); ok { p.handleICE(cand) } } } func (p *Peer) handleICE(cand map[string]interface{}) { candStr, _ := cand["candidate"].(string) target, _ := cand["target"].(string) sdpMid, _ := cand["sdpMid"].(string) sdpMLineIndex, _ := cand["sdpMlineIndex"].(float64) parts := strings.Fields(candStr) if len(parts) < 8 { return } init := webrtc.ICECandidateInit{ Candidate: candStr, SDPMid: &sdpMid, SDPMLineIndex: func() *uint16 { v := uint16(sdpMLineIndex); return &v }(), } if target == "SUBSCRIBER" { p.pcSub.AddICECandidate(init) } else if target == "PUBLISHER" { p.pcPub.AddICECandidate(init) } } func (p *Peer) sendAck(uid string) { p.ws.WriteJSON(map[string]interface{}{ "uid": uid, "ack": map[string]interface{}{ "status": map[string]interface{}{ "code": "OK", }, }, }) } func (p *Peer) setupICEHandlers() { p.pcSub.OnICECandidate(func(c *webrtc.ICECandidate) { if c == nil { return } init := c.ToJSON() p.ws.WriteJSON(map[string]interface{}{ "uid": uuid.New().String(), "webrtcIceCandidate": map[string]interface{}{ "candidate": init.Candidate, "sdpMid": init.SDPMid, "sdpMlineIndex": init.SDPMLineIndex, "target": "SUBSCRIBER", "pcSeq": 1, }, }) }) p.pcPub.OnICECandidate(func(c *webrtc.ICECandidate) { if c == nil { return } init := c.ToJSON() p.ws.WriteJSON(map[string]interface{}{ "uid": uuid.New().String(), "webrtcIceCandidate": map[string]interface{}{ "candidate": init.Candidate, "sdpMid": init.SDPMid, "sdpMlineIndex": init.SDPMLineIndex, "target": "PUBLISHER", "pcSeq": 1, }, }) }) } func (p *Peer) Close() error { close(p.closeCh) if p.ws != nil { p.ws.Close() } if p.pcSub != nil { p.pcSub.Close() } if p.pcPub != nil { p.pcPub.Close() } return nil } func (p *Peer) keepAlive() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for { select { case <-ticker.C: if p.ws != nil { if err := p.ws.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(10*time.Second)); err != nil { log.Printf("Ping error: %v", err) select { case p.reconnectCh <- struct{}{}: default: } return } } case <-p.closeCh: return } } } func (p *Peer) reconnect(ctx context.Context) error { log.Println("Reconnecting...") if p.ws != nil { p.ws.Close() } if p.pcSub != nil { p.pcSub.Close() } if p.pcPub != nil { p.pcPub.Close() } time.Sleep(2 * time.Second) conn, err := GetConnectionInfo(p.roomURL, p.name) if err != nil { return err } p.conn = conn return p.Connect(ctx) } func (p *Peer) WatchConnection(ctx context.Context) { for { select { case <-p.reconnectCh: for { if err := p.reconnect(ctx); err != nil { log.Printf("Reconnect failed: %v, retrying in 5s...", err) time.Sleep(5 * time.Second) continue } log.Println("Reconnected successfully") break } case <-p.closeCh: return case <-ctx.Done(): return } } }