diff --git a/go.mod b/go.mod index e37c661..467ff07 100644 --- a/go.mod +++ b/go.mod @@ -15,7 +15,7 @@ require ( github.com/xtaci/kcp-go/v5 v5.6.72 github.com/xtaci/smux v1.5.57 github.com/zarazaex69/gr v0.0.0-20260430043628-45b595f4fef0 - github.com/zarazaex69/j v0.0.0-20260515153314-5adb29c4fb86 + github.com/zarazaex69/j v0.0.0-20260515183030-8e13c23cdfdc golang.org/x/crypto v0.50.0 golang.org/x/mobile v0.0.0-20260410095206-2cfb76559b7b google.golang.org/genproto v0.0.0-20260209200024-4cfbd4190f57 diff --git a/go.sum b/go.sum index 50bae59..19ae7b8 100644 --- a/go.sum +++ b/go.sum @@ -235,8 +235,8 @@ github.com/xtaci/smux v1.5.57/go.mod h1:IGQ9QYrBphmb/4aTnLEcJby0TNr3NV+OslIOMrX8 github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= github.com/zarazaex69/gr v0.0.0-20260430043628-45b595f4fef0 h1:dMjHX/YPV3ZD/KJKFjQdlMBwj2/rZIuOVKOvGv26m9k= github.com/zarazaex69/gr v0.0.0-20260430043628-45b595f4fef0/go.mod h1:7vALI2tjaLTOGiDKV7V2JkVU9bA1YADBDQA6uvpp1ac= -github.com/zarazaex69/j v0.0.0-20260515153314-5adb29c4fb86 h1:uJGuIq9uk9TIo0+MItIyHPjBNf9xHqqZp7KyMv6DpIc= -github.com/zarazaex69/j v0.0.0-20260515153314-5adb29c4fb86/go.mod h1:7/ypJTenOIPx23fpo5uF7l4u+rxZqg9cFbTL/N77Ktc= +github.com/zarazaex69/j v0.0.0-20260515183030-8e13c23cdfdc h1:Nz6NuOZMNSMOujclXHE4a4/6Rb5Ivl1vMdmlXEV5GCg= +github.com/zarazaex69/j v0.0.0-20260515183030-8e13c23cdfdc/go.mod h1:7/ypJTenOIPx23fpo5uF7l4u+rxZqg9cFbTL/N77Ktc= github.com/zeebo/assert v1.3.0 h1:g7C04CbJuIDKNPFHmsk4hwZDO5O+kntRxzaUoNXj+IQ= github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN/wJ0= github.com/zeebo/xxh3 v1.1.0 h1:s7DLGDK45Dyfg7++yxI0khrfwq9661w9EN78eP/UZVs= diff --git a/internal/e2e/tunnel_test.go b/internal/e2e/tunnel_test.go index f710a83..205d6a7 100644 --- a/internal/e2e/tunnel_test.go +++ b/internal/e2e/tunnel_test.go @@ -374,9 +374,7 @@ func realE2ECaseExpectation(carrierName, transportName string) realE2EExpectatio // Jitsi colibri-ws bridge channel maps cleanly onto the // datachannel transport (raw bytes broadcast through // EndpointMessage). Video transports go through pion's - // PeerConnection negotiated via Jingle session-accept; results - // are bridge/instance dependent (some operators throttle or - // strip non-camera video), hence best-effort. + // PeerConnection negotiated via Jingle session-accept. return realE2EExpectPass default: return realE2EExpectPass diff --git a/internal/engine/jitsi/jitsi.go b/internal/engine/jitsi/jitsi.go index b984547..45160ff 100644 --- a/internal/engine/jitsi/jitsi.go +++ b/internal/engine/jitsi/jitsi.go @@ -18,6 +18,7 @@ package jitsi import ( "context" "encoding/base64" + "encoding/xml" "errors" "fmt" "strings" @@ -275,11 +276,9 @@ func (s *Session) negotiatePC(ctx context.Context, jSess *j.Session) error { webrtc.WithInterceptorRegistry(registry), ) - // Jicofo emits Plan B style SDP with separate sections per - // media kind and SSRC-keyed source descriptors. pion's default - // UnifiedPlan parser rejects this with "remote SessionDescription - // semantics does not match configuration", so we explicitly request - // Plan B for the conference PeerConnection. + // Jicofo emits Plan B style SDP. Explicit Plan B semantics match what + // the j library reference setup uses; source-add renegotiation drives + // reception of other participants' SSRCs on the same m=video section. pcConfig := jSess.IceConfig() pcConfig.SDPSemantics = webrtc.SDPSemanticsPlanB @@ -288,7 +287,16 @@ func (s *Session) negotiatePC(ctx context.Context, jSess *j.Session) error { return fmt.Errorf("new pc: %w", err) } + // Jicofo's session-initiate always includes m=audio. Without a matching + // audio transceiver, pion's answer rejects the audio m-line and JVB may + // not complete ICE for the second peer in the room. + if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeAudio, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { + _ = pc.Close() + return fmt.Errorf("add audio recvonly: %w", err) + } + s.videoTrackMu.RLock() + hasLocalTracks := len(s.videoTracks) > 0 for _, track := range s.videoTracks { if _, addErr := pc.AddTrack(track); addErr != nil { s.videoTrackMu.RUnlock() @@ -298,6 +306,19 @@ func (s *Session) negotiatePC(ctx context.Context, jSess *j.Session) error { } s.videoTrackMu.RUnlock() + // When sending video, AddTrack already creates the video m-line (sendonly). + // When only receiving, an explicit recvonly transceiver is required so the + // SDP answer includes a video m-line — without it JVB does not set up a + // video forwarding path and ICE stalls. Mirrors the j library reference CLI: + // AddTrack and AddTransceiverFromKind(video,recvonly) are mutually exclusive + // in Plan B; using both produces a malformed SDP. + if !hasLocalTracks { + if _, err := pc.AddTransceiverFromKind(webrtc.RTPCodecTypeVideo, webrtc.RTPTransceiverInit{Direction: webrtc.RTPTransceiverDirectionRecvonly}); err != nil { + _ = pc.Close() + return fmt.Errorf("add video recvonly: %w", err) + } + } + pc.OnTrack(func(track *webrtc.TrackRemote, recv *webrtc.RTPReceiver) { if track.Kind() != webrtc.RTPCodecTypeVideo { return @@ -315,10 +336,29 @@ func (s *Session) negotiatePC(ctx context.Context, jSess *j.Session) error { neg := jSess.Negotiator() neg.PC = pc + neg.OnIceConnectionStateChange = func(state webrtc.ICEConnectionState) { + logger.Debugf("jitsi ICE state: %s", state) + } if err := neg.Accept(ctx); err != nil { _ = pc.Close() return fmt.Errorf("session-accept: %w", err) } + logger.Debugf("jitsi: session-accept sent") + + // Announce our SSRCs explicitly via source-add. Even though session-accept + // already carries them, Jicofo only propagates sources advertised via + // source-add to peers that join AFTER us. + if hasLocalTracks { + if err := neg.SendSourceAddFromSDP(pc.LocalDescription().SDP); err != nil { + logger.Debugf("jitsi: source-add (initial): %v", err) + } + } + + // Drain XMPP stanzas: feed transport-info trickle ICE candidates into + // pion, handle incoming source-add (other participants' SSRCs), and + // keep the channel from filling its 64-slot buffer. + s.wg.Add(1) + go s.trickleDrainLoop(pc, neg, jSess.LowLevel().Stanzas()) // Tell JVB to forward video streams to this endpoint. if err := jSess.RequestVideo(ctx, 720); err != nil { @@ -331,6 +371,127 @@ func (s *Session) negotiatePC(ctx context.Context, jSess *j.Session) error { return nil } +// negotiator is the subset of *peer.Negotiator we need. Defined as an +// interface here because peer is in j's internal/ tree and not importable. +type negotiator interface { + HandleSourceAdd(stanza string) error +} + +// trickleDrainLoop reads the XMPP stanza channel and feeds any +// transport-info ICE candidates into the PeerConnection. It also drains +// non-jingle stanzas so the channel never fills and blocks the read loop. +// Incoming source-add stanzas (announcing other participants' SSRCs) are +// merged into the remote SDP via neg.HandleSourceAdd so pion can route the +// inbound RTP through OnTrack. +func (s *Session) trickleDrainLoop(pc *webrtc.PeerConnection, neg negotiator, stanzas <-chan string) { + defer s.wg.Done() + for { + select { + case <-s.done: + return + case raw, ok := <-stanzas: + if !ok { + return + } + switch { + case strings.Contains(raw, "transport-info"): + if err := s.applyTrickleICE(pc, raw); err != nil { + logger.Debugf("jitsi trickle ICE: %v", err) + } + case strings.Contains(raw, "source-add"): + if err := neg.HandleSourceAdd(raw); err != nil { + logger.Debugf("jitsi source-add: %v", err) + } + } + } + } +} + + +// xmlCandidate is a minimal XML representation of a Jingle ICE candidate. +type xmlCandidate struct { + Component string `xml:"component,attr"` + Foundation string `xml:"foundation,attr"` + Generation string `xml:"generation,attr"` + IP string `xml:"ip,attr"` + Port string `xml:"port,attr"` + Priority string `xml:"priority,attr"` + Protocol string `xml:"protocol,attr"` + Type string `xml:"type,attr"` + RelAddr string `xml:"rel-addr,attr"` + RelPort string `xml:"rel-port,attr"` +} + +// xmlTransportInfo is the minimal structure needed to extract candidates +// from a stanza. +type xmlTransportInfo struct { + XMLName xml.Name `xml:"iq"` + Jingle struct { + Action string `xml:"action,attr"` + Contents []struct { + Name string `xml:"name,attr"` + Transport struct { + Candidates []xmlCandidate `xml:"candidate"` + } `xml:"transport"` + } `xml:"content"` + } `xml:"jingle"` +} + +func (s *Session) applyTrickleICE(pc *webrtc.PeerConnection, raw string) error { + var ti xmlTransportInfo + if err := xml.Unmarshal([]byte(raw), &ti); err != nil { + return fmt.Errorf("parse transport-info: %w", err) + } + for _, content := range ti.Jingle.Contents { + mid := content.Name + for _, c := range content.Transport.Candidates { + sdpLine := buildSDPCandidate(c) + if sdpLine == "" { + continue + } + init := webrtc.ICECandidateInit{ + Candidate: sdpLine, + SDPMid: &mid, + } + if err := pc.AddICECandidate(init); err != nil { + logger.Debugf("jitsi add ICE candidate (%s): %v", mid, err) + } + } + } + return nil +} + +func buildSDPCandidate(c xmlCandidate) string { + if c.IP == "" || c.Port == "" { + return "" + } + comp := c.Component + if comp == "" { + comp = "1" + } + proto := strings.ToLower(c.Protocol) + if proto == "" { + proto = "udp" + } + priority := c.Priority + if priority == "" { + priority = "1" + } + candType := c.Type + if candType == "" { + candType = "host" + } + s := fmt.Sprintf("candidate:%s %s %s %s %s %s typ %s", + c.Foundation, comp, proto, priority, c.IP, c.Port, candType) + if c.RelAddr != "" && c.RelPort != "" { + s += fmt.Sprintf(" raddr %s rport %s", c.RelAddr, c.RelPort) + } + if c.Generation != "" { + s += fmt.Sprintf(" generation %s", c.Generation) + } + return s +} + // Send queues data for transmission over the bridge. // // Send is non-blocking: data is enqueued onto the engine's outbound channel @@ -459,9 +620,26 @@ func (s *Session) Close() error { return nil } + // Tell Jicofo we're leaving BEFORE closing any transport. The order + // matters: a half-torn-down websocket can drop the session-terminate / + // presence-unavailable stanzas, leaving the participant in the MUC + // roster until idle timeout. Subsequent tests then see ghost endpoints + // in the bridge channel and receive garbage during handshake. jSess := s.jSess.Load() if jSess != nil { - s.terminateJingleSession(jSess) + if err := s.terminateJingleSession(jSess); err != nil { + logger.Infof("jitsi: session-terminate failed: %v", err) + } + // Send MUC presence-unavailable and give Prosody a moment to + // route it before we tear down the websocket. + if conn := jSess.LowLevel(); conn != nil { + if err := conn.LeaveMUC(s.room); err != nil { + logger.Infof("jitsi: LeaveMUC failed: %v", err) + } else { + logger.Infof("jitsi: LeaveMUC sent") + } + time.Sleep(300 * time.Millisecond) + } } s.pcMu.Lock() @@ -501,14 +679,12 @@ func (s *Session) Close() error { // moment it dispatches session-initiate, regardless of whether the // participant ever sent session-accept, and an explicit session-terminate // frees that slot promptly. -func (s *Session) terminateJingleSession(jSess *j.Session) { +func (s *Session) terminateJingleSession(jSess *j.Session) error { neg := jSess.Negotiator() if neg == nil { - return - } - if err := neg.Terminate("success"); err != nil { - logger.Debugf("jitsi: session-terminate: %v", err) + return nil } + return neg.Terminate("success") } // SetReconnectCallback registers a callback for reconnection events. diff --git a/internal/transport/seichannel/transport.go b/internal/transport/seichannel/transport.go index 78203f3..6cb7f9b 100644 --- a/internal/transport/seichannel/transport.go +++ b/internal/transport/seichannel/transport.go @@ -3,7 +3,9 @@ package seichannel import ( "context" + "crypto/rand" "encoding/binary" + "encoding/hex" "errors" "fmt" "hash/crc32" @@ -124,15 +126,17 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) return nil, fmt.Errorf("open video track: %w", err) } + // Stream/track IDs must be unique per peer — Jitsi rejects session-accept + // when msid collides with another participant in the conference. track, err := webrtc.NewTrackLocalStaticSample( webrtc.RTPCodecCapability{ MimeType: webrtc.MimeTypeH264, ClockRate: 90000, Channels: 0, - SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42c00a", + SDPFmtpLine: "level-asymmetry-allowed=1;packetization-mode=1;profile-level-id=42e01f", }, - "seichannel", - "olcrtc", + "seichannel-"+randomID(), + "olcrtc-"+randomID(), ) if err != nil { return nil, fmt.Errorf("create local video track: %w", err) @@ -610,3 +614,14 @@ func decodeTransportFrame(data []byte) (transportFrame, error) { return transportFrame{}, ErrUnexpectedFrameType } } + +// randomID returns 8 random hex characters for use as a per-peer suffix on +// track and stream IDs. Required for Jitsi: msid collisions between +// participants cause Jicofo to reject session-accept. +func randomID() string { + var b [4]byte + if _, err := rand.Read(b[:]); err != nil { + return fmt.Sprintf("%08x", time.Now().UnixNano()) + } + return hex.EncodeToString(b[:]) +} diff --git a/internal/transport/videochannel/transport.go b/internal/transport/videochannel/transport.go index 2c35b33..8468100 100644 --- a/internal/transport/videochannel/transport.go +++ b/internal/transport/videochannel/transport.go @@ -3,6 +3,8 @@ package videochannel import ( "context" + "crypto/rand" + "encoding/hex" "errors" "fmt" "hash/crc32" @@ -104,7 +106,10 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) } codec := codecSpecForCarrier(cfg.Carrier) - track, err := webrtc.NewTrackLocalStaticSample(codec.capability, "videochannel", "olcrtc") + // Stream/track IDs must be unique per peer: Jitsi/Jicofo keys participant + // sources by msid (stream-id+track-id) and rejects a session-accept whose + // msid collides with one already in the conference. + track, err := webrtc.NewTrackLocalStaticSample(codec.capability, "videochannel-"+randomID(), "olcrtc-"+randomID()) if err != nil { return nil, fmt.Errorf("create local video track: %w", err) } @@ -632,3 +637,14 @@ func (p *streamTransport) resolveAck(seq, crc uint32) { default: } } + +// randomID returns 8 random hex characters for use as a per-peer suffix on +// track and stream IDs. Required for Jitsi: msid collisions between +// participants cause Jicofo to reject session-accept. +func randomID() string { + var b [4]byte + if _, err := rand.Read(b[:]); err != nil { + return fmt.Sprintf("%08x", time.Now().UnixNano()) + } + return hex.EncodeToString(b[:]) +} diff --git a/internal/transport/vp8channel/transport.go b/internal/transport/vp8channel/transport.go index 6050616..5beacd5 100644 --- a/internal/transport/vp8channel/transport.go +++ b/internal/transport/vp8channel/transport.go @@ -29,6 +29,7 @@ import ( "context" "crypto/rand" "encoding/binary" + "encoding/hex" "errors" "fmt" "hash/crc32" @@ -141,13 +142,15 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) return nil, fmt.Errorf("open video track: %w", err) } + // Stream/track IDs must be unique per peer — Jitsi rejects session-accept + // when msid collides with another participant in the conference. track, err := webrtc.NewTrackLocalStaticSample( webrtc.RTPCodecCapability{ MimeType: webrtc.MimeTypeVP8, ClockRate: 90000, }, - "vp8channel", - "olcrtc", + "vp8channel-"+randomID(), + "olcrtc-"+randomID(), ) if err != nil { return nil, fmt.Errorf("create local video track: %w", err) @@ -247,6 +250,17 @@ func bindingToken(clientID string) uint32 { return token } +// randomID returns 8 random hex characters for use as a per-peer suffix on +// track and stream IDs. Required for Jitsi: msid collisions between +// participants cause Jicofo to reject session-accept. +func randomID() string { + var b [4]byte + if _, err := rand.Read(b[:]); err != nil { + return fmt.Sprintf("%08x", time.Now().UnixNano()) + } + return hex.EncodeToString(b[:]) +} + func randomEpoch() uint32 { var b [4]byte if _, err := rand.Read(b[:]); err != nil {