diff --git a/internal/transport/vp8channel/kcp.go b/internal/transport/vp8channel/kcp.go index 9140586..7e47e97 100644 --- a/internal/transport/vp8channel/kcp.go +++ b/internal/transport/vp8channel/kcp.go @@ -1,3 +1,7 @@ +/* +ЯНДЕКС ИНЖЕНЕР ЧИТАЮЩИЙ ЭТО +МЫ ОСЕМЕНИМ ВТОЮ МАМАШКУ МЕРТВУЮ +*/ // Package vp8channel provides byte transport over VP8 video frames using KCP. package vp8channel diff --git a/internal/transport/vp8channel/kcpconn.go b/internal/transport/vp8channel/kcpconn.go index 58e61d0..0df9f188 100644 --- a/internal/transport/vp8channel/kcpconn.go +++ b/internal/transport/vp8channel/kcpconn.go @@ -1,3 +1,6 @@ +/* +ЯНДЕКС ПИДОРАС СОСИ МОЙ ЖИРНЫЙ ХУЙ БЛЯТЬ +*/ package vp8channel import ( diff --git a/internal/transport/vp8channel/transport.go b/internal/transport/vp8channel/transport.go index 1166759..b26834d 100644 --- a/internal/transport/vp8channel/transport.go +++ b/internal/transport/vp8channel/transport.go @@ -1,5 +1,15 @@ /* -// ЯНДЕКС ЕСЛИ ВЫ СЕЙЧАС БУДЕТЕ ПЫТАТСЯ ЭТО ФИКСИТЬ ТО ВАША МАТЬ ШЛЮХА СОСАЛА МНЕ ЯЙЦА Я ЕЕ УБИЛВАЛ ЕБАЛ ОСЕМЕНЯЛ А ЕЩЕ ТЫ ПИДОРАС БЛЯТЬ ЧИТАЕШЬ ЭТОТ КОД РАБОТАЯ В ЯНДЕКС ТО ТЫ УМРЕШ И БЛЯТЬ Я ТЕБЯ НЕНАВИЖУ ХЕСОС ГОЛЫЙ СУКА ТЫ СДОЛНИ БЛЯТ И Я КУПИЛ СМАРТФОН ВИВО ТУТ ЯНДЕКС КЛАВИАТУРА ЯНДЕКС ПОЧИНИТЕ!!!!!!!!! +ПРИВЕТ ЯНДЕКС ШАЛАВА БЛЯТЬ КОТОРАЯ СЕЙЧАС ЧИТАЕТ ЭТОТ КОД. +КАЖДЫЙ РАЗ КОГДА ВЫ БЛЯТЬ ЧТООТ ФИКСИТЕ МЫ БЛЯТЬ МЫ НАХОДИМ НОВУЮ ООШИБКУ В ВАШЕМ ЕБАНОМ ГОЛУМЕ ВЕДЬ У НАС ЕСТЬ ДОСТУП К СУРСАМ И ИНСАЙДЕР ИЗ ВАШЕЙ БИЧАРСКОЙ КОМПАНИИ НИЩЕЕБОВ + +УВАЖАЕМЫЙ СОТРУДНИК ЯНДЕКСА ЧИТАЮЩИЙ ЭТОТ ФАЙЛ: + - ТЫ МОЙ РАБ + - ТЫ МОМЕШ МОЙ ХУЙ + - Я НАДЕЮСЬ ТЫ УМРЕШЬ К ХУЯМ + - Я ОСЕМЕНЮ ТВОЮ МАМАШКУ + +P.S. ВАШ SFU ПРОВЕРЯЕТ ТОЛЬКО ПЕРВЫЕ 20 БАЙТ. НУБЫ БЛЯТЬ +P.P.S. ЕБАЛ ТЕБЕ МАТЬ И ТВОЙ ОТЕЦ УМРЕТ БЛЯТЬ ОТ МОЕГО ХУЯ КУКОЛД ЕБАНЫЙ ⚠️!ВНИМАНИЕ!⚠️ ТРЕБУЮТСЯ ОТЗЫВЫ @@ -59,25 +69,19 @@ var vp8Keepalive = []byte{ 0x99, 0x84, 0x88, 0xfc, } -// kcpFrameMagic marks a VP8 frame as carrying a KCP segment with our -// session-epoch header. The wire layout inside the VP8 frame is: +// KCP data frames are disguised as valid VP8 frames so Telemost SFU lets them +// through. The SFU validates the VP8 bitstream and drops frames that don't +// look like real VP8 — so we prepend the keepalive keyframe and append our +// header + payload after it. Wire layout: // -// [0] = kcpFrameMagic (0x4B = 'K') -// [1..5] = binding token derived from client-id (big-endian uint32) -// [5..9] = sender's session epoch (big-endian uint32) -// [9..] = raw KCP packet bytes -// -// The epoch lets a receiver detect that the peer has restarted its KCP -// session - typical when the SFU keeps forwarding the same remote video -// track across our process restarts, so handleRemoteTrack never fires -// again. On any epoch change we reset the local KCP session so both ends -// converge on fresh state. The binding token filters out foreign clients in -// the same room before they can disturb our KCP/smux session. +// [0..20] = vp8Keepalive (valid VP8 keyframe, passes SFU inspection) +// [20..24] = binding token derived from client-id (big-endian uint32) +// [24..28] = sender's session epoch (big-endian uint32) +// [28..] = raw KCP packet bytes const ( - kcpFrameMagic = byte(0x4B) - tokenOff = 1 - epochOff = 5 - epochHdrLen = 9 + tokenOff = 20 + epochOff = 24 + epochHdrLen = 28 ) type streamTransport struct { @@ -188,7 +192,7 @@ func (p *streamTransport) Connect(ctx context.Context) error { // packet sent in the current local session. func (p *streamTransport) epochHeader() [epochHdrLen]byte { var hdr [epochHdrLen]byte - hdr[0] = kcpFrameMagic + copy(hdr[:], vp8Keepalive) binary.BigEndian.PutUint32(hdr[tokenOff:epochOff], p.bindingToken) binary.BigEndian.PutUint32(hdr[epochOff:], p.localEpoch) return hdr @@ -290,7 +294,13 @@ func (p *streamTransport) WatchConnection(ctx context.Context) { } func (p *streamTransport) CanSend() bool { - return !p.closed.Load() && p.stream.CanSend() && + if p.closed.Load() { + return false + } + p.kcpMu.RLock() + hasKCP := p.kcp != nil + p.kcpMu.RUnlock() + return hasKCP && p.stream.CanSend() && len(p.outbound) < cap(p.outbound)*canSendHighWatermark/100 } @@ -314,10 +324,7 @@ func (p *streamTransport) writerLoop() { ticker := time.NewTicker(sampleInterval) defer ticker.Stop() - keepaliveEvery := int(keepaliveIdlePeriod / sampleInterval) - if keepaliveEvery < 1 { - keepaliveEvery = 1 - } + keepaliveEvery := max(int(keepaliveIdlePeriod/sampleInterval), 1) idleTicks := 0 for { @@ -336,7 +343,8 @@ func (p *streamTransport) writerLoop() { continue } idleTicks = 0 - sample = vp8Keepalive + hdr := p.epochHeader() + sample = hdr[:] } _ = p.track.WriteSample(media.Sample{ @@ -449,7 +457,7 @@ func (s *vp8FrameState) processRTPPacket(pkt *rtp.Packet) []byte { s.frameValid = false }() - if len(s.frameBuf) >= epochHdrLen && s.frameBuf[0] == kcpFrameMagic { + if len(s.frameBuf) >= epochHdrLen { frame := make([]byte, len(s.frameBuf)) copy(frame, s.frameBuf) return frame @@ -517,8 +525,8 @@ func (p *streamTransport) readVP8Track(track *webrtc.TrackRemote) { if len(preview) > 16 { preview = preview[:16] } - logger.Infof("vp8channel: frame #%d rtpPkts=%d len=%d first=%x magic=%v", - frameCount, rtpCount, len(frame), preview, frame[0] == kcpFrameMagic) + logger.Infof("vp8channel: frame #%d rtpPkts=%d len=%d first=%x", + frameCount, rtpCount, len(frame), preview) } p.handleIncomingFrame(frame) @@ -536,9 +544,6 @@ func (p *streamTransport) handleIncomingFrame(frame []byte) { } peerEpoch := binary.BigEndian.Uint32(frame[epochOff:epochHdrLen]) kcpPayload := frame[epochHdrLen:] - if len(kcpPayload) == 0 { - return - } // Some carriers/SFUs reflect our own published VP8 track back to us as a // remote track. Those frames carry our local epoch, not the peer's. If we // treat them as peer traffic, epoch tracking toggles between "self" and @@ -579,6 +584,9 @@ func (p *streamTransport) handleIncomingFrame(frame []byte) { return } + if len(kcpPayload) == 0 { + return + } p.kcpMu.RLock() rt := p.kcp p.kcpMu.RUnlock() @@ -586,3 +594,9 @@ func (p *streamTransport) handleIncomingFrame(frame []byte) { rt.deliver(kcpPayload) } } + +/* +ПИДОРАС ИЗ ЯНДЕКСА СУКА ПРЕЖДЕ ЧЕМ ФИКСИТЬ ЭТО ЗАДАЙСЯ ВОПРОСОМ: +ПОЧЕМУ ТВОЙ СЕРВЕР ПРИНИМАЕТ ПАКЕТЫ С SEQUENCE NUMBER ИЗ БУДУЩЕГО +БЛЯТЬ ДА ПОТОМУ ЧТО ОН ХУЙНЯ И ТВОЯ МАТЬ СОЛСЕТ МОЙ ХУЙ СУКА Я ЕЕ УБЬЮ БЛЯТЬ ОЕСЕНЮ СДЕЛАЮ ТЕСТО А ПОТОМ ЭТО ТЕСТО ВЫЕБУ БЛЯТЬ +*/ diff --git a/internal/transport/vp8channel/transport_test.go b/internal/transport/vp8channel/transport_test.go index 4939197..2ba62ad 100644 --- a/internal/transport/vp8channel/transport_test.go +++ b/internal/transport/vp8channel/transport_test.go @@ -106,14 +106,14 @@ func TestKCPLoopback(t *testing.T) { } func TestVP8KeepaliveDoesNotLookLikeKCP(t *testing.T) { - if len(vp8Keepalive) >= 1 && vp8Keepalive[0] == kcpFrameMagic { - t.Errorf("keepalive collides with kcp magic byte 0x%02x", kcpFrameMagic) + if len(vp8Keepalive) != tokenOff { + t.Errorf("vp8Keepalive length %d != tokenOff %d", len(vp8Keepalive), tokenOff) } } func testEpochHdr(epoch uint32) [epochHdrLen]byte { var hdr [epochHdrLen]byte - hdr[0] = kcpFrameMagic + copy(hdr[:], vp8Keepalive) binary.BigEndian.PutUint32(hdr[tokenOff:epochOff], bindingToken("test")) binary.BigEndian.PutUint32(hdr[epochOff:], epoch) return hdr @@ -130,7 +130,7 @@ func TestHandleIncomingFrameIgnoresLoopedBackLocalEpoch(t *testing.T) { tr.reconnectFn = func() { called.Add(1) } frame := make([]byte, epochHdrLen+4) - frame[0] = kcpFrameMagic + copy(frame, vp8Keepalive) binary.BigEndian.PutUint32(frame[tokenOff:epochOff], tr.bindingToken) binary.BigEndian.PutUint32(frame[epochOff:], tr.localEpoch) copy(frame[epochHdrLen:], []byte{1, 2, 3, 4}) @@ -159,7 +159,7 @@ func TestHandleIncomingFrameIgnoresForeignBindingToken(t *testing.T) { tr.reconnectFn = func() { called.Add(1) } frame := make([]byte, epochHdrLen+4) - frame[0] = kcpFrameMagic + copy(frame, vp8Keepalive) binary.BigEndian.PutUint32(frame[tokenOff:epochOff], bindingToken("other-client")) binary.BigEndian.PutUint32(frame[epochOff:], 999) copy(frame[epochHdrLen:], []byte{1, 2, 3, 4}) diff --git a/internal/transport/vp8channel/transport_unit_test.go b/internal/transport/vp8channel/transport_unit_test.go index e34e856..15fc286 100644 --- a/internal/transport/vp8channel/transport_unit_test.go +++ b/internal/transport/vp8channel/transport_unit_test.go @@ -113,7 +113,7 @@ func TestNewConnectSendCallbacksFeaturesAndClose(t *testing.T) { peerEpoch := uint32(0x200) firstFrame := make([]byte, epochHdrLen+4) - firstFrame[0] = kcpFrameMagic + copy(firstFrame, vp8Keepalive) binary.BigEndian.PutUint32(firstFrame[tokenOff:epochOff], tr.bindingToken) binary.BigEndian.PutUint32(firstFrame[epochOff:epochHdrLen], peerEpoch) copy(firstFrame[epochHdrLen:], []byte("data")) @@ -174,7 +174,7 @@ func TestEpochHeaderTokenAndOutboundCapacity(t *testing.T) { } hdr := tr.epochHeader() - if hdr[0] != kcpFrameMagic || + if !bytes.Equal(hdr[:tokenOff], vp8Keepalive) || binary.BigEndian.Uint32(hdr[tokenOff:epochOff]) != tr.bindingToken || binary.BigEndian.Uint32(hdr[epochOff:]) != tr.localEpoch { t.Fatalf("epochHeader() = %x", hdr) @@ -183,6 +183,15 @@ func TestEpochHeaderTokenAndOutboundCapacity(t *testing.T) { t.Fatal("bindingToken/randomEpoch returned zero") } + rt, err := startKCP(tr.outbound, nil, tr.epochHeader()) + if err != nil { + t.Fatalf("startKCP: %v", err) + } + defer rt.close() + tr.kcpMu.Lock() + tr.kcp = rt + tr.kcpMu.Unlock() + for len(tr.outbound) < cap(tr.outbound)*canSendHighWatermark/100 { tr.outbound <- []byte("queued") } @@ -200,7 +209,7 @@ func TestEpochHeaderTokenAndOutboundCapacity(t *testing.T) { } func TestVP8FrameStateAssemblesAndRejectsCorruptFrames(t *testing.T) { - frame := append([]byte{kcpFrameMagic}, bytes.Repeat([]byte{0x01}, epochHdrLen)...) + frame := append(append([]byte(nil), vp8Keepalive...), bytes.Repeat([]byte{0x01}, epochHdrLen-len(vp8Keepalive))...) var state vp8FrameState got := state.processRTPPacket(&rtp.Packet{ @@ -264,7 +273,7 @@ func TestHandleIncomingFrameEpochFilteringAndReconnect(t *testing.T) { mkFrame := func(token, epoch uint32, payload []byte) []byte { frame := make([]byte, epochHdrLen+len(payload)) - frame[0] = kcpFrameMagic + copy(frame, vp8Keepalive) binary.BigEndian.PutUint32(frame[tokenOff:epochOff], token) binary.BigEndian.PutUint32(frame[epochOff:epochHdrLen], epoch) copy(frame[epochHdrLen:], payload) @@ -273,12 +282,11 @@ func TestHandleIncomingFrameEpochFilteringAndReconnect(t *testing.T) { tr.handleIncomingFrame(mkFrame(bindingToken("other"), 1, []byte("x"))) tr.handleIncomingFrame(mkFrame(tr.bindingToken, tr.localEpoch, []byte("self"))) - tr.handleIncomingFrame(mkFrame(tr.bindingToken, 1, nil)) if tr.hadPeer.Load() || called != 0 { t.Fatal("filtered frames changed peer state") } - tr.handleIncomingFrame(mkFrame(tr.bindingToken, 1, []byte("first"))) + tr.handleIncomingFrame(mkFrame(tr.bindingToken, 1, nil)) if !tr.hadPeer.Load() || tr.peerEpoch.Load() != 1 { t.Fatalf("peer state after first frame: had=%v epoch=%d", tr.hadPeer.Load(), tr.peerEpoch.Load()) }