From b2b842cf09d9d0e6f0309c4bb16339f37e5a4ec2 Mon Sep 17 00:00:00 2001 From: zarazaex69 Date: Tue, 14 Apr 2026 01:27:39 +0300 Subject: [PATCH] feat(jazz): add protobuf-style packet encoding and decoding for datachannel messages --- internal/provider/jazz/datapacket.go | 149 +++++++++++++++++++++++++++ internal/provider/jazz/peer.go | 39 +++++-- 2 files changed, 181 insertions(+), 7 deletions(-) create mode 100644 internal/provider/jazz/datapacket.go diff --git a/internal/provider/jazz/datapacket.go b/internal/provider/jazz/datapacket.go new file mode 100644 index 0000000..a89d5ef --- /dev/null +++ b/internal/provider/jazz/datapacket.go @@ -0,0 +1,149 @@ +package jazz + +import ( + "encoding/binary" + "io" + + "github.com/google/uuid" +) + +func encodeVarint(value uint64) []byte { + buf := make([]byte, binary.MaxVarintLen64) + n := binary.PutUvarint(buf, value) + return buf[:n] +} + +func encodeField(fieldNumber int, wireType int, data []byte) []byte { + tag := encodeVarint(uint64((fieldNumber << 3) | wireType)) + if wireType == 0 { + return append(tag, data...) + } + if wireType == 2 { + length := encodeVarint(uint64(len(data))) + result := append(tag, length...) + return append(result, data...) + } + return append(tag, data...) +} + +func EncodeDataPacket(payload []byte) []byte { + msgID := uuid.New().String() + + userFields := encodeField(2, 2, payload) + userFields = append(userFields, encodeField(8, 2, []byte(msgID))...) + + userPacket := userFields + + dp := encodeField(1, 0, encodeVarint(0)) + dp = append(dp, encodeField(2, 2, userPacket)...) + + return dp +} + +func readVarint(r io.ByteReader) (uint64, error) { + return binary.ReadUvarint(r) +} + +func DecodeDataPacket(raw []byte) ([]byte, bool) { + reader := &byteReader{data: raw, pos: 0} + + var userData []byte + + for reader.pos < len(reader.data) { + tagVal, err := readVarint(reader) + if err != nil { + break + } + + fieldNumber := int(tagVal >> 3) + wireType := int(tagVal & 0x07) + + if wireType == 0 { + _, _ = readVarint(reader) + } else if wireType == 2 { + length, err := readVarint(reader) + if err != nil { + break + } + data := make([]byte, length) + n, err := reader.Read(data) + if err != nil || n != int(length) { + break + } + if fieldNumber == 2 { + userData = data + } + } else if wireType == 1 { + reader.pos += 8 + } else if wireType == 5 { + reader.pos += 4 + } else { + break + } + } + + if userData == nil { + return nil, false + } + + innerReader := &byteReader{data: userData, pos: 0} + var payload []byte + + for innerReader.pos < len(innerReader.data) { + tagVal, err := readVarint(innerReader) + if err != nil { + break + } + + fn := int(tagVal >> 3) + wt := int(tagVal & 0x07) + + if wt == 0 { + _, _ = readVarint(innerReader) + } else if wt == 2 { + length, err := readVarint(innerReader) + if err != nil { + break + } + data := make([]byte, length) + n, err := innerReader.Read(data) + if err != nil || n != int(length) { + break + } + if fn == 2 { + payload = data + } + } else if wt == 1 { + innerReader.pos += 8 + } else if wt == 5 { + innerReader.pos += 4 + } else { + break + } + } + + return payload, len(payload) > 0 +} + +type byteReader struct { + data []byte + pos int +} + +func (b *byteReader) ReadByte() (byte, error) { + if b.pos >= len(b.data) { + return 0, io.EOF + } + c := b.data[b.pos] + b.pos++ + return c, nil +} + +func (b *byteReader) Read(p []byte) (int, error) { + if b.pos >= len(b.data) { + return 0, io.EOF + } + n := copy(p, b.data[b.pos:]) + b.pos += n + return n, nil +} diff --git a/internal/provider/jazz/peer.go b/internal/provider/jazz/peer.go index 555768e..7a126f1 100644 --- a/internal/provider/jazz/peer.go +++ b/internal/provider/jazz/peer.go @@ -203,9 +203,20 @@ func (p *Peer) setupDataChannelHandlers(dcReady chan struct{}) { }) p.dc.OnMessage(func(msg webrtc.DataChannelMessage) { - logger.Verbosef("[Jazz] Received %d bytes on publisher DC", len(msg.Data)) - if p.onData != nil && len(msg.Data) > 0 { - p.onData(msg.Data) + logger.Verbosef("[Jazz] Received %d bytes on publisher DC (raw)", len(msg.Data)) + + payload, ok := DecodeDataPacket(msg.Data) + if !ok { + logger.Debugf("[Jazz] Failed to decode DataPacket, trying raw") + if p.onData != nil && len(msg.Data) > 0 { + p.onData(msg.Data) + } + return + } + + logger.Verbosef("[Jazz] Decoded DataPacket: %d bytes payload", len(payload)) + if p.onData != nil && len(payload) > 0 { + p.onData(payload) } }) @@ -216,9 +227,20 @@ func (p *Peer) setupDataChannelHandlers(dcReady chan struct{}) { } dc.OnMessage(func(msg webrtc.DataChannelMessage) { - logger.Verbosef("[Jazz] Received %d bytes on subscriber DC (_reliable)", len(msg.Data)) - if p.onData != nil && len(msg.Data) > 0 { - p.onData(msg.Data) + logger.Verbosef("[Jazz] Received %d bytes on subscriber DC (_reliable, raw)", len(msg.Data)) + + payload, ok := DecodeDataPacket(msg.Data) + if !ok { + logger.Debugf("[Jazz] Failed to decode DataPacket from subscriber, trying raw") + if p.onData != nil && len(msg.Data) > 0 { + p.onData(msg.Data) + } + return + } + + logger.Verbosef("[Jazz] Decoded DataPacket from subscriber: %d bytes payload", len(payload)) + if p.onData != nil && len(payload) > 0 { + p.onData(payload) } }) }) @@ -452,7 +474,10 @@ func (p *Peer) processSendQueue() { case <-p.closeCh: return case data := <-p.sendQueue: - if err := p.dc.Send(data); err != nil { + encoded := EncodeDataPacket(data) + logger.Verbosef("[Jazz] Sending %d bytes (encoded to %d bytes)", len(data), len(encoded)) + + if err := p.dc.Send(encoded); err != nil { logger.Debugf("send error: %v", err) p.queueReconnect() return