feat(vp8channel): wrap carrier in KCP for reliable, ordered delivery

Plug a KCP session in front of the VP8 wire so the upper layer (mux/curl
tunnels) gets reliable, ordered, message-oriented delivery on top of an
otherwise unreliable carrier.

Why this fixes the random 0 B/s stalls
--------------------------------------
The previous design had no retransmits and no ACKs. A single dropped
VP8 frame (from drainOutbound on micro-reconnect, the inbound 'default:'
overflow drop, or RTP loss invalidating a partial assembly) created a
hole in mux.Stream.nextSeq that would never be filled: the receiver
parked all subsequent frames in outOfOrder forever and curl read 0 B/s
until a brand new mux stream id was opened.

KCP layer details
-----------------
* Stream mode + 4-byte big-endian length prefix. Message mode would be
  ideal but UDPSession.Write fragments anything > MSS *outside* of
  kcp.Send, leaving every fragment with frg=0 and breaking PeekSize-based
  reassembly. Length-prefix framing under stream mode is the canonical
  workaround.
* nodelay(1, 10ms, fast-resend=2, no congestion control) — KCP turbo
  preset, identical to kcptun/shadowsocks tuning. Recovers from burst
  losses in tens of ms instead of seconds.
* SndWnd/RcvWnd 4096 segments to absorb the multi-segment burst that
  a single VP8 sample can carry.
* MTU 1400 (kcp-go hardcodes mtuLimit=1500).

Wire compatibility with VP8 keepalives
--------------------------------------
KCP packets always start with the LE conv id (0x01 0xEE 0xFF 0xC0). VP8
keepalive frames start with 0x30. The receive path filters by the magic
byte before handing the buffer to KCP, so keepalives never poison the
KCP state machine.

Features() now advertises Reliable+Ordered. Drop-on-overflow paths in
the carrier are kept — KCP detects the loss via SACK and retransmits.

Refs: transport/vp8-kcp
This commit is contained in:
zarazaex69
2026-05-02 18:03:44 +03:00
parent 940b32e1fb
commit 35c05b654a
6 changed files with 516 additions and 136 deletions

4
go.mod
View File

@@ -61,10 +61,14 @@ require (
github.com/pion/stun/v3 v3.1.2 // indirect
github.com/pion/transport/v4 v4.0.1 // indirect
github.com/pion/turn/v4 v4.1.4 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/puzpuzpuz/xsync/v3 v3.5.1 // indirect
github.com/redis/go-redis/v9 v9.17.2 // indirect
github.com/tjfoc/gmsm v1.4.1 // indirect
github.com/twitchtv/twirp v8.1.3+incompatible // indirect
github.com/wlynxg/anet v0.0.5 // indirect
github.com/xtaci/kcp-go/v5 v5.6.72 // indirect
github.com/xtaci/smux v1.5.57 // indirect
github.com/zeebo/xxh3 v1.1.0 // indirect
go.opentelemetry.io/otel v1.40.0 // indirect
go.uber.org/atomic v1.11.0 // indirect

69
go.sum
View File

@@ -6,10 +6,12 @@ buf.build/go/protoyaml v0.6.0 h1:Nzz1lvcXF8YgNZXk+voPPwdU8FjDPTUV4ndNTXN0n2w=
buf.build/go/protoyaml v0.6.0/go.mod h1:RgUOsBu/GYKLDSIRgQXniXbNgFlGEZnQpRAUdLAFV2Q=
cel.dev/expr v0.25.1 h1:1KrZg61W6TWSxuNZ37Xy49ps13NUovb66QLprthtwi4=
cel.dev/expr v0.25.1/go.mod h1:hrXvqGP6G6gyx8UAHSHJ5RGk//1Oj5nXQ2NI02Nrsg4=
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
dario.cat/mergo v1.0.0 h1:AGCNq9Evsj31mOgNPcLyXc+4PNABt905YmuqPYYpBWk=
dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0=
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Microsoft/go-winio v0.6.2 h1:F2VQgta7ecxGYO8k3ZZz3RS8fVIXVxONVUPlNERoyfY=
github.com/Microsoft/go-winio v0.6.2/go.mod h1:yd8OoFMLzJbo9gZq8j5qaps8bJ9aShtEA8Ipt1oGCvU=
github.com/Nvveen/Gotty v0.0.0-20120604004816-cd527374f1e5 h1:TngWCqHvy9oXAN6lEVMRuU21PR1EtLVZJmdB18Gu3Rw=
@@ -28,8 +30,11 @@ github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA=
github.com/bsm/gomega v1.27.10/go.mod h1:JyEr/xRbxbtgWNi8tIEVPUYZ5Dzef52k01W3YH0H+O0=
github.com/cenkalti/backoff/v4 v4.3.0 h1:MyRJ/UdXutAwSAT+s3wNd7MfTIcy71VQueUuFK343L8=
github.com/cenkalti/backoff/v4 v4.3.0/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/containerd/continuity v0.4.5 h1:ZRoN1sXq9u7V6QoHMcVWGhOwDFqZ4B9i5H6un1Wh0x4=
github.com/containerd/continuity v0.4.5/go.mod h1:/lNJvtJKUQStBzpVQ1+rasXO1LAWtUQssk28EZvJ3nE=
github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI=
@@ -51,6 +56,9 @@ github.com/docker/go-connections v0.6.0 h1:LlMG9azAe1TqfR7sO+NJttz1gy6KO7VJBh+pM
github.com/docker/go-connections v0.6.0/go.mod h1:AahvXYshr6JgfUJGdDCs2b5EZG/vmaMAntpSFH5BFKE=
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg=
github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
github.com/frostbyte73/core v0.1.1 h1:ChhJOR7bAKOCPbA+lqDLE2cGKlCG5JXsDvvQr4YaJIA=
@@ -68,10 +76,25 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag=
github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE=
github.com/go-viper/mapstructure/v2 v2.4.0 h1:EBsztssimR/CONLSZZ04E8qAkxNYq4Qp9LvH92wZUgs=
github.com/go-viper/mapstructure/v2 v2.4.0/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek=
github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps=
github.com/google/cel-go v0.27.0 h1:e7ih85+4qVrBuqQWTW4FKSqZYokVuc3HnhH5keboFTo=
github.com/google/cel-go v0.27.0/go.mod h1:tTJ11FWqnhw5KKpnWpvW9CJC3Y9GK4EIS0WXnBbebzw=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8=
github.com/google/go-cmp v0.7.0/go.mod h1:pXiqmnSA92OHEEa9HXL2W4E7lf9JzCmGVUdgjX3N/iU=
@@ -172,6 +195,7 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/puzpuzpuz/xsync/v3 v3.5.1 h1:GJYJZwO6IdxN/IKbneznS6yPkVC+c3zyY/j19c++5Fg=
github.com/puzpuzpuz/xsync/v3 v3.5.1/go.mod h1:VjzYrABPabuM4KyBh1Ftq6u8nhwY5tBPKP9jpmh0nnA=
github.com/redis/go-redis/v9 v9.17.2 h1:P2EGsA4qVIM3Pp+aPocCJ7DguDHhqrXNhVcEp4ViluI=
@@ -186,6 +210,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.11.1 h1:7s2iGBzp5EwR7/aIZr8ao5+dra3wiQyKjjFuvgVKu7U=
github.com/stretchr/testify v1.11.1/go.mod h1:wZwfW3scLgRK+23gO65QZefKpKQRnfz6sD981Nm4B6U=
github.com/tjfoc/gmsm v1.4.1 h1:aMe1GlZb+0bLjn+cKTPEvvn9oUEBlJitaZiiBwsbgho=
github.com/tjfoc/gmsm v1.4.1/go.mod h1:j4INPkHWMrhJb38G+J6W4Tw0AbuN8Thu3PbdVYhVcTE=
github.com/twitchtv/twirp v8.1.3+incompatible h1:+F4TdErPgSUbMZMwp13Q/KgDVuI7HJXP61mNV3/7iuU=
github.com/twitchtv/twirp v8.1.3+incompatible/go.mod h1:RRJoFSAmTEh2weEqWtpPE3vFK5YBhA6bqp2l1kfCC5A=
github.com/wlynxg/anet v0.0.5 h1:J3VJGi1gvo0JwZ/P1/Yc/8p63SoW98B5dHkYDmpgvvU=
@@ -196,6 +222,10 @@ github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415 h1:EzJWgHo
github.com/xeipuuv/gojsonreference v0.0.0-20180127040603-bd5ef7bd5415/go.mod h1:GwrjFmJcFw6At/Gs6z4yjiIwzuJ1/+UwLxMQDVQXShQ=
github.com/xeipuuv/gojsonschema v1.2.0 h1:LhYJRs+L4fBtjZUfuSZIKGeVu0QRy8e5Xi7D17UxZ74=
github.com/xeipuuv/gojsonschema v1.2.0/go.mod h1:anYRn/JVcOK2ZgGU+IjEV4nwlhoK5sQluxsYJ78Id3Y=
github.com/xtaci/kcp-go/v5 v5.6.72 h1:FLaQPalgpufJYQRk0OK+gErEhXGLUPjv6FSRPrFR8Lk=
github.com/xtaci/kcp-go/v5 v5.6.72/go.mod h1:9O3D8WR+cyyUjGiTILYfg17vn72otWuXK2AFfqIe6CM=
github.com/xtaci/smux v1.5.57 h1:N72VbGoSYxgcm6mPOYX0QzEZNVD3UI/JlVvAtXF+WrY=
github.com/xtaci/smux v1.5.57/go.mod h1:IGQ9QYrBphmb/4aTnLEcJby0TNr3NV+OslIOMrX825Q=
github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY=
github.com/zarazaex69/b v0.0.0-20260423064626-c0bd20863b89 h1:ytA0RfQZTYfjqFA9lBJMX1DTnXpTuKg0nf4udgdpunE=
github.com/zarazaex69/b v0.0.0-20260423064626-c0bd20863b89/go.mod h1:OUqzZNoXsg+ccaiAnSe0t4f8qc0W/cFx6io0lWsE1Gw=
@@ -228,31 +258,49 @@ go.uber.org/zap/exp v0.3.0/go.mod h1:5I384qq7XGxYyByIhHm6jg5CHkGY0nsTfbDLgDDlgJQ
go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc=
go.yaml.in/yaml/v3 v3.0.4/go.mod h1:DhzuOOF2ATzADvBadXxruRBLzYTpT36CKvDb3+aBEFg=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.19.0/go.mod h1:Iy9bg/ha4yyC70EfRS8jz+B6ybOBKMaSxLj6P6oBDfU=
golang.org/x/crypto v0.50.0 h1:zO47/JPrL6vsNkINmLoo/PH1gcxpls50DNogFvB5ZGI=
golang.org/x/crypto v0.50.0/go.mod h1:3muZ7vA7PBCE6xgPX7nkzzjiUq87kRItoJQM1Yo8S+Q=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20260212183809-81e46e3db34a h1:ovFr6Z0MNmU7nH8VaX5xqw+05ST2uO1exVfZPVqRC5o=
golang.org/x/exp v0.0.0-20260212183809-81e46e3db34a/go.mod h1:K79w1Vqn7PoiZn+TkNpx3BUWUQksGO3JcVX6qIjytmA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/mobile v0.0.0-20260410095206-2cfb76559b7b h1:Qt2eaXcZ8x20iAcoZ6AceeMMtnjuPHvC51KRCH1DKSQ=
golang.org/x/mobile v0.0.0-20260410095206-2cfb76559b7b/go.mod h1:5Fu78lew5ucMXt8w2KYcwvxu2rkC/liHzUvaoiI+H/M=
golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4=
golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.35.0 h1:Ww1D637e6Pg+Zb2KrWfHQUnH2dQRLBQyAtpr/haaJeM=
golang.org/x/mod v0.35.0/go.mod h1:+GwiRhIInF8wPm+4AoT6L0FA1QWAad3OMdTRx4tFYlU=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201010224723-4f7140c49acb/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.53.0 h1:d+qAbo5L0orcWAr0a9JweQpjXF19LMXJE8Ey7hwOdUA=
golang.org/x/net v0.53.0/go.mod h1:JvMuJH7rrdiCfbeHoo3fCQU24Lf5JJwT9W3sJFulfgs=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.20.0 h1:e0PTpb7pjO8GAtTs2dQ6jYa5BWYlMuX047Dco/pItO4=
golang.org/x/sync v0.20.0/go.mod h1:9xrNwdLfx4jkKbNva9FpL6vEN7evnE43NNNJQ2LF3+0=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@@ -278,20 +326,39 @@ golang.org/x/text v0.36.0/go.mod h1:NIdBknypM8iqVmPiuco0Dh6P5Jcdk8lJL0CUebqK164=
golang.org/x/time v0.15.0 h1:bbrp8t3bGUeFOx08pvsMYRTCVSMk89u4tKbNOZbp88U=
golang.org/x/time v0.15.0/go.mod h1:Y4YMaQmXwGQZoFaVFk4YpCt4FLQMYKZe9oeV/f4MSno=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU=
golang.org/x/tools v0.44.0 h1:UP4ajHPIcuMjT1GqzDWRlalUEoY+uzoZKnhOjbIPD2c=
golang.org/x/tools v0.44.0/go.mod h1:KA0AfVErSdxRZIsOVipbv3rQhVXTnlU6UhKxHd1seDI=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1 h1:go1bK/D/BFZV2I8cIQd1NKEZ+0owSTG1fDTci4IqFcE=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57 h1:JLQynH/LBHfCTSbDWl+py8C+Rg/k1OVH3xfcaiANuF0=
google.golang.org/genproto/googleapis/api v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:kSJwQxqmFXeo79zOmbrALdflXQeAYcUbgS7PbpMknCY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57 h1:mWPCjDEyshlQYzBpMNHaEof6UX1PmHcaUODUywQ0uac=
google.golang.org/genproto/googleapis/rpc v0.0.0-20260209200024-4cfbd4190f57/go.mod h1:j9x/tPzZkyxcgEFkiKEEGxfvyumM01BEtsW8xzOahRQ=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.31.0/go.mod h1:N36X2cJ7JwdamYAgDz+s+rVMFjt3numwzf/HckM8pak=
google.golang.org/grpc v1.79.1 h1:zGhSi45ODB9/p3VAawt9a+O/MULLl9dpizzNNpq7flY=
google.golang.org/grpc v1.79.1/go.mod h1:KmT0Kjez+0dde/v2j9vzwoAScgEPx/Bw1CYChhHLrHQ=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.36.11 h1:fV6ZwhNocDyBLK0dj+fg8ektcVegBBuEolpbTQyBNVE=
google.golang.org/protobuf v1.36.11/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
@@ -300,5 +367,7 @@ gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
rsc.io/qr v0.2.0 h1:6vBLea5/NRMVTz8V66gipeLycZMl/+UlFmk8DvqQ6WY=
rsc.io/qr v0.2.0/go.mod h1:IF+uZjkb9fqyeF/4tlBoynqmQxUoPfWEKh921coOuXs=

View File

@@ -0,0 +1,149 @@
package vp8channel
import (
"encoding/binary"
"errors"
"fmt"
"io"
"sync"
kcp "github.com/xtaci/kcp-go/v5"
)
// Both peers establish a KCP session with the same convid. KCP does not
// require a handshake — packets are matched by conv field, so a static
// constant gives us a symmetrical P2P setup.
const kcpConvID = 0xC0FFEE01
// KCP tuning targets a lossy, bursty carrier (VP8 over an SFU). The defaults
// are TCP-like and recover slowly after burst losses.
const (
// kcp-go hardcodes mtuLimit=1500, so SetMtu() above this is silently
// clamped. Stay below that with headroom for KCP overhead (24 bytes).
kcpMTU = 1400
// Receive/send window in segments. Large window allows in-flight bursts
// without stalling — important when one VP8 frame may carry many KCP
// segments and ACKs trickle back at frame cadence.
kcpSndWnd = 4096
kcpRcvWnd = 4096
// Length prefix for our message framing on top of KCP stream mode.
// We use stream mode because UDPSession.Write fragments messages > MSS
// outside of kcp.Send, which destroys the frg field that message mode
// relies on for boundary preservation. Adding our own length-prefix
// framing sidesteps that bug entirely.
kcpLenPrefix = 4
// Hard cap on a single message. Anything larger would require an
// unbounded reassembly buffer on the receiver and is almost certainly
// a protocol error upstream.
kcpMaxMessage = 8 * 1024 * 1024
)
// ErrKCPMessageTooLarge is returned by send when the message exceeds
// kcpMaxMessage.
var ErrKCPMessageTooLarge = errors.New("vp8channel: kcp message exceeds maximum size")
// kcpRuntime owns the KCP session and the goroutine that pumps reassembled
// messages from KCP up to cfg.OnData.
type kcpRuntime struct {
conn *kcpConn
sess *kcp.UDPSession
readDone chan struct{}
writeMu sync.Mutex // serializes length-prefix + payload writes
closeOnce sync.Once
}
func startKCP(out chan<- []byte, onData func([]byte)) (*kcpRuntime, error) {
c := newKCPConn(out, inboundQueueSize)
sess, err := kcp.NewConn3(kcpConvID, fakeAddr, nil, 0, 0, c)
if err != nil {
_ = c.Close()
return nil, fmt.Errorf("kcp new conn: %w", err)
}
// Aggressive ARQ tuning: nodelay=1, interval=10ms, fast resend=2, no
// congestion control. This is the standard "turbo" preset used by KCP
// tunnels on lossy networks (shadowsocks, kcptun) and is the whole point
// of choosing KCP over SCTP.
sess.SetNoDelay(1, 10, 2, 1)
sess.SetWindowSize(kcpSndWnd, kcpRcvWnd)
sess.SetMtu(kcpMTU)
sess.SetStreamMode(true) // see kcpLenPrefix comment above
sess.SetACKNoDelay(true)
sess.SetWriteDelay(false)
rt := &kcpRuntime{
conn: c,
sess: sess,
readDone: make(chan struct{}),
}
go rt.readLoop(onData)
return rt, nil
}
func (r *kcpRuntime) readLoop(onData func([]byte)) {
defer close(r.readDone)
var hdr [kcpLenPrefix]byte
for {
if _, err := io.ReadFull(r.sess, hdr[:]); err != nil {
return
}
size := binary.BigEndian.Uint32(hdr[:])
if size == 0 {
continue
}
if size > kcpMaxMessage {
// Stream framing is now corrupted — there is no safe way to
// resync without a session reset. Bail and let the upper layer
// reconnect.
return
}
payload := make([]byte, size)
if _, err := io.ReadFull(r.sess, payload); err != nil {
return
}
if onData != nil {
onData(payload)
}
}
}
// deliver hands a wire payload (already reassembled out of VP8 RTP) to KCP.
func (r *kcpRuntime) deliver(payload []byte) {
r.conn.deliver(payload)
}
// send queues an application message for reliable delivery. The length
// prefix + payload pair is written under a mutex so that interleaved
// concurrent senders cannot tear the framing.
func (r *kcpRuntime) send(msg []byte) error {
if len(msg) > kcpMaxMessage {
return ErrKCPMessageTooLarge
}
var hdr [kcpLenPrefix]byte
binary.BigEndian.PutUint32(hdr[:], uint32(len(msg)))
r.writeMu.Lock()
defer r.writeMu.Unlock()
if _, err := r.sess.Write(hdr[:]); err != nil {
return err
}
if _, err := r.sess.Write(msg); err != nil {
return err
}
return nil
}
func (r *kcpRuntime) close() {
r.closeOnce.Do(func() {
_ = r.sess.Close()
_ = r.conn.Close()
})
}

View File

@@ -0,0 +1,143 @@
package vp8channel
import (
"net"
"sync"
"time"
)
// fakeAddr is a placeholder address used by the KCP session. The underlying
// "packet conn" is a point-to-point pipe over the VP8 carrier and has no real
// notion of an address, but kcp-go's API requires one.
var fakeAddr = &net.UDPAddr{IP: net.IPv4(127, 0, 0, 1), Port: 1}
// kcpConn is a net.PacketConn implementation that bridges kcp-go on top of
// the vp8channel byte-message carrier.
//
// kcp.UDPSession ──Write──▶ WriteTo ──▶ outbound chan ──▶ VP8 wire
// kcp.UDPSession ◀──Read── ReadFrom ◀── inbound (deliver) ◀── VP8 wire
//
// All packet boundaries are preserved by the underlying transport, which is
// exactly what KCP expects from a UDP-like conn.
type kcpConn struct {
out chan<- []byte
in chan []byte
closed chan struct{}
closeOnce sync.Once
mu sync.Mutex
rDeadline time.Time
wDeadline time.Time
}
func newKCPConn(out chan<- []byte, inboundCap int) *kcpConn {
if inboundCap <= 0 {
inboundCap = 1024
}
return &kcpConn{
out: out,
in: make(chan []byte, inboundCap),
closed: make(chan struct{}),
}
}
// deliver hands an incoming wire payload to the KCP read loop. Drops on
// overflow are intentional — KCP will detect the loss via SACK and retransmit.
func (c *kcpConn) deliver(payload []byte) {
cp := make([]byte, len(payload))
copy(cp, payload)
select {
case c.in <- cp:
case <-c.closed:
default:
}
}
func (c *kcpConn) ReadFrom(p []byte) (int, net.Addr, error) {
c.mu.Lock()
deadline := c.rDeadline
c.mu.Unlock()
var timerC <-chan time.Time
if !deadline.IsZero() {
d := time.Until(deadline)
if d <= 0 {
return 0, nil, errTimeout{}
}
t := time.NewTimer(d)
defer t.Stop()
timerC = t.C
}
select {
case msg := <-c.in:
n := copy(p, msg)
return n, fakeAddr, nil
case <-c.closed:
return 0, nil, net.ErrClosed
case <-timerC:
return 0, nil, errTimeout{}
}
}
func (c *kcpConn) WriteTo(p []byte, _ net.Addr) (int, error) {
buf := make([]byte, len(p))
copy(buf, p)
c.mu.Lock()
deadline := c.wDeadline
c.mu.Unlock()
var timerC <-chan time.Time
if !deadline.IsZero() {
d := time.Until(deadline)
if d <= 0 {
return 0, errTimeout{}
}
t := time.NewTimer(d)
defer t.Stop()
timerC = t.C
}
select {
case c.out <- buf:
return len(p), nil
case <-c.closed:
return 0, net.ErrClosed
case <-timerC:
return 0, errTimeout{}
}
}
func (c *kcpConn) Close() error {
c.closeOnce.Do(func() { close(c.closed) })
return nil
}
func (c *kcpConn) LocalAddr() net.Addr { return fakeAddr }
func (c *kcpConn) SetDeadline(t time.Time) error {
_ = c.SetReadDeadline(t)
_ = c.SetWriteDeadline(t)
return nil
}
func (c *kcpConn) SetReadDeadline(t time.Time) error {
c.mu.Lock()
c.rDeadline = t
c.mu.Unlock()
return nil
}
func (c *kcpConn) SetWriteDeadline(t time.Time) error {
c.mu.Lock()
c.wDeadline = t
c.mu.Unlock()
return nil
}
type errTimeout struct{}
func (errTimeout) Error() string { return "i/o timeout" }
func (errTimeout) Timeout() bool { return true }
func (errTimeout) Temporary() bool { return true }

View File

@@ -2,7 +2,6 @@ package vp8channel
import (
"context"
"encoding/binary"
"errors"
"fmt"
"sync"
@@ -20,7 +19,6 @@ import (
const (
defaultMaxPayloadSize = 60 * 1024
defaultConnectTimeout = 30 * time.Second
dataMarker = 0xFF
rtpBufSize = 65536
outboundQueueSize = 1024
inboundQueueSize = 1024
@@ -33,31 +31,37 @@ var (
ErrTransportClosed = errors.New("vp8channel transport closed")
)
// vp8Keepalive is a minimal VP8 keyframe used as idle filler. It must be
// a keyframe (P-bit=0) so that the SFU/decoder has a valid reference and
// forwards subsequent frames; switching to a P-frame here causes the SFU
// to drop the entire stream until a keyframe arrives.
// vp8Keepalive is a minimal VP8 keyframe used as idle filler so that the SFU
// keeps the track flowing when KCP has nothing to send. It is never delivered
// to KCP because KCP packets always start with the convid (0xC0FFEE01 LE)
// and would never collide with this keyframe payload.
var vp8Keepalive = []byte{
0x30, 0x01, 0x00, 0x9d, 0x01, 0x2a, 0x10, 0x00,
0x10, 0x00, 0x00, 0x47, 0x08, 0x85, 0x85, 0x88,
0x99, 0x84, 0x88, 0xfc,
}
// kcpMagic is the little-endian first byte of a KCP packet (low byte of
// kcpConvID = 0xC0FFEE01). Anything that does not match is treated as
// non-KCP traffic (idle keepalives, stray frames after reconnect) and
// dropped before reaching the protocol stack.
const kcpMagic = byte(0x01)
type streamTransport struct {
stream carrier.VideoTrack
track *webrtc.TrackLocalStaticSample
onData func([]byte)
outbound chan []byte
inbound chan []byte
closeCh chan struct{}
writerDone chan struct{}
dispatchDone chan struct{}
closed atomic.Bool
writerUp atomic.Bool
dispatchUp atomic.Bool
startOnce sync.Once
frameInterval time.Duration
batchSize int
kcp *kcpRuntime
kcpMu sync.RWMutex
}
func New(ctx context.Context, cfg transport.Config) (transport.Transport, error) {
@@ -103,10 +107,8 @@ func New(ctx context.Context, cfg transport.Config) (transport.Transport, error)
track: track,
onData: cfg.OnData,
outbound: make(chan []byte, outboundQueueSize),
inbound: make(chan []byte, inboundQueueSize),
closeCh: make(chan struct{}),
writerDone: make(chan struct{}),
dispatchDone: make(chan struct{}),
frameInterval: time.Second / time.Duration(fps),
batchSize: batchSize,
}
@@ -127,14 +129,25 @@ func (p *streamTransport) Connect(ctx context.Context) error {
return err
}
var startErr error
p.startOnce.Do(func() {
// Start KCP first so the writerLoop has packets to forward as soon
// as it begins ticking. KCP's own update goroutine drives keepalives
// and ACKs once the session is up.
rt, err := startKCP(p.outbound, p.onData)
if err != nil {
startErr = err
return
}
p.kcpMu.Lock()
p.kcp = rt
p.kcpMu.Unlock()
p.writerUp.Store(true)
go p.writerLoop()
p.dispatchUp.Store(true)
go p.dispatchLoop()
})
return nil
return startErr
}
func (p *streamTransport) Send(data []byte) error {
@@ -142,25 +155,30 @@ func (p *streamTransport) Send(data []byte) error {
return ErrTransportClosed
}
frame := encodeDataFrame(data)
select {
case <-p.closeCh:
p.kcpMu.RLock()
rt := p.kcp
p.kcpMu.RUnlock()
if rt == nil {
return ErrTransportClosed
case p.outbound <- frame:
return nil
}
return rt.send(data)
}
func (p *streamTransport) Close() error {
if p.closed.CompareAndSwap(false, true) {
close(p.closeCh)
p.kcpMu.RLock()
rt := p.kcp
p.kcpMu.RUnlock()
if rt != nil {
rt.close()
}
if p.writerUp.Load() {
<-p.writerDone
}
if p.dispatchUp.Load() {
<-p.dispatchDone
}
return p.stream.Close()
}
return nil
@@ -178,6 +196,10 @@ func (p *streamTransport) drainOutbound() {
func (p *streamTransport) SetReconnectCallback(cb func()) {
p.stream.SetReconnectCallback(func() {
// Drain stale KCP segments queued for the old wire. KCP will
// retransmit anything that mattered after the link is back up,
// so dropping the queue here only saves us from sending obsolete
// data that the peer would discard anyway.
p.drainOutbound()
if cb != nil {
cb()
@@ -202,10 +224,13 @@ func (p *streamTransport) CanSend() bool {
len(p.outbound) < cap(p.outbound)*canSendHighWatermark/100
}
// Features advertises reliable+ordered semantics now that KCP guarantees
// in-order delivery with retransmits. The upper layer (mux/curl tunnel)
// can rely on these properties end-to-end.
func (p *streamTransport) Features() transport.Features {
return transport.Features{
Reliable: false,
Ordered: false,
Reliable: true,
Ordered: true,
MessageOriented: true,
MaxPayloadSize: defaultMaxPayloadSize,
}
@@ -299,9 +324,9 @@ func (p *streamTransport) readVP8Track(track *webrtc.TrackRemote) {
// Detect packet loss / reordering. A single missing RTP packet
// inside a fragmented VP8 frame would otherwise silently corrupt
// the assembled payload (and bleed into the next frame), so we
// invalidate the current assembly and only resume on a fresh
// start-of-partition packet.
// the assembled payload (and bleed into the next frame). KCP can
// recover from full-frame drops, but only if the frames it does
// receive are byte-perfect.
if haveLastSeq {
expected := lastSeq + 1
if pkt.SequenceNumber != expected {
@@ -331,57 +356,20 @@ func (p *streamTransport) readVP8Track(track *webrtc.TrackRemote) {
frameBuf = append(frameBuf, vp8Payload...)
if pkt.Marker {
data := extractDataFromPayload(frameBuf)
if len(frameBuf) >= 4 && frameBuf[0] == kcpMagic {
p.kcpMu.RLock()
rt := p.kcp
p.kcpMu.RUnlock()
if rt != nil {
// Copy out of the shared frame buffer before handing
// the payload off — KCP's deliver path is async.
payload := make([]byte, len(frameBuf))
copy(payload, frameBuf)
rt.deliver(payload)
}
}
frameBuf = frameBuf[:0]
frameValid = false
if data == nil {
continue
}
// Copy out of the shared frame buffer before handing the
// payload to the dispatch goroutine.
payload := make([]byte, len(data))
copy(payload, data)
// Non-blocking enqueue: dropping is preferable to stalling
// the RTP read loop, which would let pion's UDP buffer
// overflow and cause cascading packet loss.
select {
case p.inbound <- payload:
default:
}
}
}
}
func (p *streamTransport) dispatchLoop() {
defer close(p.dispatchDone)
for {
select {
case <-p.closeCh:
return
case payload := <-p.inbound:
if p.onData != nil {
p.onData(payload)
}
}
}
}
func encodeDataFrame(data []byte) []byte {
frame := make([]byte, 5+len(data))
frame[0] = dataMarker
binary.BigEndian.PutUint32(frame[1:5], uint32(len(data)))
copy(frame[5:], data)
return frame
}
func extractDataFromPayload(frame []byte) []byte {
if len(frame) < 5 || frame[0] != dataMarker {
return nil
}
length := binary.BigEndian.Uint32(frame[1:5])
if len(frame) < int(5+length) {
return nil
}
return frame[5 : 5+length]
}

View File

@@ -2,73 +2,100 @@ package vp8channel
import (
"bytes"
"sync"
"testing"
"time"
)
func TestEncodeDecodeDataFrame(t *testing.T) {
testCases := []struct {
name string
data []byte
}{
{"empty", []byte{}},
{"small", []byte("hello")},
{"medium", bytes.Repeat([]byte("x"), 1000)},
{"large", bytes.Repeat([]byte("y"), 50000)},
// TestKCPLoopback runs two KCP runtimes back-to-back through an in-memory
// pipe simulating a perfect carrier. Verifies that messages survive the
// KCP layer with their boundaries intact.
func TestKCPLoopback(t *testing.T) {
a2b := make(chan []byte, 256)
b2a := make(chan []byte, 256)
var bRecvMu sync.Mutex
var bRecv [][]byte
doneB := make(chan struct{})
rtA, err := startKCP(a2b, nil)
if err != nil {
t.Fatalf("startKCP A: %v", err)
}
defer rtA.close()
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
encoded := encodeDataFrame(tc.data)
if encoded[0] != dataMarker {
t.Errorf("expected marker 0x%02x, got 0x%02x", dataMarker, encoded[0])
}
decoded := extractDataFromPayload(encoded)
if decoded == nil {
t.Fatal("extractDataFromPayload returned nil")
}
if !bytes.Equal(decoded, tc.data) {
t.Errorf("data mismatch: got %d bytes, want %d bytes", len(decoded), len(tc.data))
rtB, err := startKCP(b2a, func(msg []byte) {
bRecvMu.Lock()
bRecv = append(bRecv, append([]byte(nil), msg...))
n := len(bRecv)
bRecvMu.Unlock()
if n == 3 {
close(doneB)
}
})
if err != nil {
t.Fatalf("startKCP B: %v", err)
}
defer rtB.close()
// Pump packets between the two runtimes.
stop := make(chan struct{})
defer close(stop)
go func() {
for {
select {
case <-stop:
return
case pkt := <-a2b:
rtB.deliver(pkt)
}
}
}()
go func() {
for {
select {
case <-stop:
return
case pkt := <-b2a:
rtA.deliver(pkt)
}
}
}()
msgs := [][]byte{
[]byte("hello"),
bytes.Repeat([]byte("x"), 1000),
bytes.Repeat([]byte("y"), 20000),
}
for _, m := range msgs {
if err := rtA.send(m); err != nil {
t.Fatalf("send: %v", err)
}
}
func TestExtractDataFromPayload_Invalid(t *testing.T) {
testCases := []struct {
name string
frame []byte
}{
{"too short", []byte{0xFF, 0x00}},
{"wrong marker", []byte{0x9D, 0x01, 0x2A, 0x00, 0x00}},
{"length mismatch", []byte{0xFF, 0x00, 0x00, 0x00, 0x10, 0x01, 0x02}},
select {
case <-doneB:
case <-time.After(5 * time.Second):
t.Fatal("timeout waiting for messages")
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
result := extractDataFromPayload(tc.frame)
if result != nil {
t.Errorf("expected nil, got %v", result)
bRecvMu.Lock()
defer bRecvMu.Unlock()
if len(bRecv) != len(msgs) {
t.Fatalf("got %d messages, want %d", len(bRecv), len(msgs))
}
for i, m := range msgs {
if !bytes.Equal(bRecv[i], m) {
t.Errorf("msg %d mismatch: got %d bytes, want %d", i, len(bRecv[i]), len(m))
}
})
}
}
func TestExtractDataFromPayload_Keepalive(t *testing.T) {
result := extractDataFromPayload(vp8Keepalive)
if result != nil {
t.Errorf("keepalive should return nil, got %v", result)
}
}
func TestVP8KeepaliveFormat(t *testing.T) {
if len(vp8Keepalive) < 3 {
t.Fatal("keepalive too short")
}
if vp8Keepalive[0] == dataMarker {
t.Error("keepalive should not start with data marker")
func TestVP8KeepaliveDoesNotLookLikeKCP(t *testing.T) {
// Keepalive frames must not be mistaken for KCP packets by the receive
// path; otherwise the KCP stack would constantly chew on garbage.
if len(vp8Keepalive) >= 1 && vp8Keepalive[0] == kcpMagic {
t.Errorf("keepalive collides with kcp magic byte 0x%02x", kcpMagic)
}
}