Merge upstream master

This commit is contained in:
Qtozdec
2026-04-10 15:31:26 +03:00
7 changed files with 130 additions and 126 deletions

3
cnc.sh
View File

@@ -1,5 +1,8 @@
#!/bin/bash
echo "ЕСЛИ У ВАС ЕСТЬ ПРОБЛЕМЫ - Я В КУРСЕ, ПРОЕКТ В БЕТЕ, ПО ПРОБЛЕМАМ В ЧАТ t.me/openlibrecommunity ИЛИ ВООБЩЕ НЕКУДА, ЖДИТЕ РЕЛИЗА"
set -e
CONTAINER_NAME="olcrtc-client"

View File

@@ -166,7 +166,6 @@ func (c *Client) onData(data []byte) {
return
}
logger.Verbose("Received %d bytes from server", len(plaintext))
c.mux.HandleFrame(plaintext)
}
@@ -208,7 +207,6 @@ func (c *Client) runSOCKS5(ctx context.Context, port int, username, password str
func (c *Client) handleSOCKS5(conn net.Conn, username, password string) {
defer conn.Close()
startTime := time.Now()
buf := make([]byte, 513)
@@ -316,36 +314,20 @@ func (c *Client) handleSOCKS5(conn net.Conn, username, password string) {
}
reqData, _ := json.Marshal(req)
sendTime := time.Now()
queueLen := 0
buffered := uint64(0)
for _, peer := range c.peers {
if peer != nil {
queueLen += len(peer.GetSendQueue())
buffered += peer.GetBufferedAmount()
}
}
c.mux.SendData(sid, reqData)
log.Printf("[CLIENT] sid=%d SEND_REQUEST elapsed=%v queue_len=%d dc_buffered=%d",
sid, time.Since(sendTime), queueLen, buffered)
dataReady := c.mux.WaitForData(sid)
timeout := time.NewTimer(10 * time.Second)
defer timeout.Stop()
waitStart := time.Now()
select {
case <-dataReady:
log.Printf("[CLIENT] sid=%d RESPONSE_RECEIVED wait_time=%v total_elapsed=%v", sid, time.Since(waitStart), time.Since(startTime))
stream := c.mux.GetStream(sid)
if stream == nil || len(stream.RecvBuf()) == 0 {
conn.Write([]byte{5, 4, 0, 1, 0, 0, 0, 0, 0, 0})
return
}
case <-timeout.C:
log.Printf("[CLIENT] sid=%d TIMEOUT after wait_time=%v total_elapsed=%v", sid, time.Since(waitStart), time.Since(startTime))
conn.Write([]byte{5, 4, 0, 1, 0, 0, 0, 0, 0, 0})
return
}
@@ -353,7 +335,6 @@ func (c *Client) handleSOCKS5(conn net.Conn, username, password string) {
c.mux.ReadStream(sid)
conn.Write([]byte{5, 0, 0, 1, 0, 0, 0, 0, 0, 0})
log.Printf("[CLIENT] sid=%d SOCKS5_READY total_elapsed=%v", sid, time.Since(startTime))
done := make(chan struct{})
streamClosed := make(chan struct{})
@@ -377,20 +358,22 @@ func (c *Client) handleSOCKS5(conn net.Conn, username, password string) {
defer close(streamClosed)
defer c.mux.CleanupDataChannel(sid)
ticker := time.NewTicker(10 * time.Millisecond)
defer ticker.Stop()
for {
dataReady := c.mux.WaitForData(sid)
select {
case <-done:
return
case <-dataReady:
for {
data := c.mux.ReadStream(sid)
if len(data) == 0 {
break
}
if _, err := conn.Write(data); err != nil {
return
case <-ticker.C:
data := c.mux.ReadStream(sid)
if len(data) > 0 {
for len(data) > 0 {
n, err := conn.Write(data)
if err != nil {
return
}
data = data[n:]
}
}

View File

@@ -65,7 +65,7 @@ func (m *Multiplexer) OpenStream() uint16 {
if m.nextID == 0 {
m.nextID = 1
}
if _, exists := m.streams[sid]; !exists {
m.streams[sid] = &Stream{
ID: sid,
@@ -84,13 +84,16 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error {
m.mu.RUnlock()
if !exists || stream.closed {
logger.Debug("SendData: stream %d not exists or closed", sid)
return nil
}
logger.Verbose("SendData: sid=%d, size=%d bytes", sid, len(data))
const chunkSize = 7168
totalChunks := (len(data) + chunkSize - 1) / chunkSize
if totalChunks > 10 {
logger.Debug("SendData: sid=%d, size=%d bytes, chunks=%d", sid, len(data), totalChunks)
}
const chunkSize = 4096
for i := 0; i < len(data); i += chunkSize {
end := i + chunkSize
if end > len(data) {
@@ -98,12 +101,12 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error {
}
chunk := data[i:end]
m.sendSeqMu.Lock()
seq := m.sendSeq[sid]
m.sendSeq[sid]++
m.sendSeqMu.Unlock()
frame := make([]byte, 12+len(chunk))
binary.BigEndian.PutUint32(frame[0:4], m.clientID)
binary.BigEndian.PutUint16(frame[4:6], sid)
@@ -112,7 +115,6 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error {
copy(frame[12:], chunk)
if err := m.onSend(frame); err != nil {
logger.Debug("[MUX] sid=%d onSend error: %v", sid, err)
return err
}
}
@@ -127,7 +129,7 @@ func (m *Multiplexer) CloseStream(sid uint16) error {
if stream, exists := m.streams[sid]; exists {
stream.closed = true
}
m.sendSeqMu.Lock()
delete(m.sendSeq, sid)
m.sendSeqMu.Unlock()
@@ -147,7 +149,7 @@ func (m *Multiplexer) HandleFrame(frame []byte) {
clientID := binary.BigEndian.Uint32(frame[0:4])
sid := binary.BigEndian.Uint16(frame[4:6])
length := binary.BigEndian.Uint16(frame[6:8])
if sid == 0xFFFF && length == 0xFFFF {
m.mu.Lock()
for streamSid, stream := range m.streams {
@@ -166,8 +168,6 @@ func (m *Multiplexer) HandleFrame(frame []byte) {
sid := binary.BigEndian.Uint16(frame[4:6])
length := binary.BigEndian.Uint16(frame[6:8])
seq := binary.BigEndian.Uint32(frame[8:12])
logger.Verbose("[MUX] HandleFrame sid=%d len=%d seq=%d", sid, length, seq)
if sid == 0xFFFF && length == 0xFFFF {
m.mu.Lock()
@@ -198,7 +198,7 @@ func (m *Multiplexer) HandleFrame(frame []byte) {
m.mu.Lock()
defer m.mu.Unlock()
stream, exists := m.streams[sid]
if !exists {
if len(m.streams) >= m.maxStreams {
@@ -219,7 +219,7 @@ func (m *Multiplexer) HandleFrame(frame []byte) {
stream.nextSeq = 0
stream.outOfOrder = make(map[uint32][]byte)
}
if seq == stream.nextSeq {
// Backpressure: if the stream buffer is full, release the mux lock and
// wait for the reader to drain it. Dropping/closing here would corrupt
@@ -253,7 +253,7 @@ func (m *Multiplexer) HandleFrame(frame []byte) {
stream.nextSeq++
logger.Verbose("Applied out-of-order packet sid=%d seq=%d", sid, stream.nextSeq-1)
}
m.dataReadyMu.Lock()
if ch, ok := m.dataReady[sid]; ok {
select {
@@ -265,10 +265,7 @@ func (m *Multiplexer) HandleFrame(frame []byte) {
} else if seq > stream.nextSeq {
if len(stream.outOfOrder) < 100 {
stream.outOfOrder[seq] = append([]byte(nil), data...)
logger.Verbose("Buffered out-of-order packet sid=%d seq=%d (expected %d)", sid, seq, stream.nextSeq)
}
} else {
logger.Verbose("Dropped duplicate packet sid=%d seq=%d (expected %d)", sid, seq, stream.nextSeq)
}
}
@@ -337,10 +334,10 @@ func (m *Multiplexer) Reset() {
for _, stream := range m.streams {
stream.closed = true
}
m.streams = make(map[uint16]*Stream)
m.nextID = 1
m.sendSeqMu.Lock()
m.sendSeq = make(map[uint16]uint32)
m.sendSeqMu.Unlock()
@@ -349,14 +346,14 @@ func (m *Multiplexer) Reset() {
func (m *Multiplexer) UpdateSendFunc(onSend func([]byte) error) {
m.mu.Lock()
defer m.mu.Unlock()
m.onSend = onSend
}
func (m *Multiplexer) WaitForData(sid uint16) <-chan struct{} {
m.dataReadyMu.Lock()
defer m.dataReadyMu.Unlock()
if _, ok := m.dataReady[sid]; !ok {
m.dataReady[sid] = make(chan struct{}, 1)
}
@@ -366,7 +363,7 @@ func (m *Multiplexer) WaitForData(sid uint16) <-chan struct{} {
func (m *Multiplexer) CleanupDataChannel(sid uint16) {
m.dataReadyMu.Lock()
defer m.dataReadyMu.Unlock()
if ch, ok := m.dataReady[sid]; ok {
close(ch)
delete(m.dataReady, sid)

View File

@@ -182,8 +182,6 @@ func (s *Server) onData(data []byte) {
return
}
logger.Verbose("Received %d bytes from client", len(plaintext))
if len(plaintext) >= 12 {
clientID := binary.BigEndian.Uint32(plaintext[0:4])
sid := binary.BigEndian.Uint16(plaintext[4:6])
@@ -261,7 +259,6 @@ func (s *Server) run(ctx context.Context) error {
go func(sid uint16) {
data := s.mux.ReadStream(sid)
if len(data) > 0 {
log.Printf("[SERVER] sid=%d READ_STREAM size=%d", sid, len(data))
s.connMu.RLock()
conn, exists := s.connections[sid]
s.connMu.RUnlock()
@@ -342,9 +339,7 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) {
log.Printf("[SERVER] sid=%d CONNECT_SUCCESS dial_time=%v", sid, dialElapsed)
sendStart := time.Now()
s.mux.SendData(sid, []byte{0x00})
log.Printf("[SERVER] sid=%d RESPONSE_SENT send_time=%v total_elapsed=%v", sid, time.Since(sendStart), time.Since(startTime))
go func() {
defer func() {
@@ -355,9 +350,15 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) {
}()
buf := make([]byte, 16384)
totalSent := uint64(0)
lastLog := time.Now()
for {
n, err := conn.Read(buf)
if err != nil {
if totalSent > 1024*1024 {
log.Printf("[SERVER] sid=%d TRANSFER_COMPLETE total=%d MB", sid, totalSent/(1024*1024))
}
return
}
@@ -368,6 +369,12 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) {
if err := s.mux.SendData(sid, buf[:n]); err != nil {
return
}
totalSent += uint64(n)
if time.Since(lastLog) > 5*time.Second {
log.Printf("[SERVER] sid=%d TRANSFER_PROGRESS sent=%d MB", sid, totalSent/(1024*1024))
lastLog = time.Now()
}
}
}()
}

View File

@@ -86,7 +86,7 @@ func (p *Peer) Connect(ctx context.Context) error {
if err != nil {
return err
}
p.pcSub.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
log.Printf("Subscriber PeerConnection state: %s", state.String())
if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected {
@@ -101,7 +101,7 @@ func (p *Peer) Connect(ctx context.Context) error {
if err != nil {
return err
}
p.pcPub.OnConnectionStateChange(func(state webrtc.PeerConnectionState) {
log.Printf("Publisher PeerConnection state: %s", state.String())
if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected {
@@ -120,7 +120,7 @@ func (p *Peer) Connect(ctx context.Context) error {
dcReady := make(chan struct{})
p.dc.OnOpen(func() {
log.Println("DataChannel opened")
numWorkers := 4
for i := 0; i < numWorkers; i++ {
p.wg.Add(1)
@@ -129,16 +129,16 @@ func (p *Peer) Connect(ctx context.Context) error {
p.processSendQueue(workerID)
}(i)
}
p.wg.Add(1)
go func() {
defer p.wg.Done()
p.monitorQueue()
}()
close(dcReady)
})
p.dc.OnClose(func() {
log.Println("DataChannel closed")
if p.onReconnect != nil {
@@ -187,7 +187,7 @@ func (p *Peer) Connect(ctx context.Context) error {
ws.SetReadDeadline(time.Now().Add(60 * time.Second))
return nil
})
ws.SetReadDeadline(time.Now().Add(60 * time.Second))
p.wg.Add(1)
@@ -222,11 +222,11 @@ func (p *Peer) Send(data []byte) error {
if p.dc == nil || p.dc.ReadyState() != webrtc.DataChannelStateOpen {
return fmt.Errorf("datachannel not ready")
}
if p.sendQueueClosed.Load() {
return fmt.Errorf("send queue closed")
}
select {
case p.sendQueue <- data:
return nil
@@ -251,13 +251,13 @@ func (p *Peer) sendHello() error {
"name": p.name,
"role": "SPEAKER",
},
"sendAudio": false,
"sendVideo": false,
"sendSharing": false,
"participantId": p.conn.PeerID,
"roomId": p.conn.RoomID,
"serviceName": "telemost",
"credentials": p.conn.Credentials,
"sendAudio": false,
"sendVideo": false,
"sendSharing": false,
"participantId": p.conn.PeerID,
"roomId": p.conn.RoomID,
"serviceName": "telemost",
"credentials": p.conn.Credentials,
"capabilitiesOffer": map[string]interface{}{
"offerAnswerMode": []string{"SEPARATE"},
"initialSubscriberOffer": []string{"ON_HELLO"},
@@ -295,7 +295,7 @@ func (p *Peer) handleSignaling() {
}
return
}
p.wsMu.Lock()
if p.ws != nil {
p.ws.SetReadDeadline(time.Now().Add(60 * time.Second))
@@ -320,7 +320,7 @@ func (p *Peer) handleSignaling() {
p.sendPong(uid)
continue
}
if _, ok := msg["pong"]; ok {
p.sendAck(uid)
continue
@@ -432,7 +432,7 @@ func (p *Peer) handleICE(cand map[string]interface{}) {
func (p *Peer) sendAck(uid string) {
p.wsMu.Lock()
defer p.wsMu.Unlock()
p.ws.WriteJSON(map[string]interface{}{
"uid": uid,
"ack": map[string]interface{}{
@@ -446,9 +446,9 @@ func (p *Peer) sendAck(uid string) {
func (p *Peer) sendPong(uid string) {
p.wsMu.Lock()
defer p.wsMu.Unlock()
p.ws.WriteJSON(map[string]interface{}{
"uid": uid,
"uid": uid,
"pong": map[string]interface{}{},
})
}
@@ -464,11 +464,11 @@ func (p *Peer) setupICEHandlers() {
p.ws.WriteJSON(map[string]interface{}{
"uid": uuid.New().String(),
"webrtcIceCandidate": map[string]interface{}{
"candidate": init.Candidate,
"sdpMid": init.SDPMid,
"candidate": init.Candidate,
"sdpMid": init.SDPMid,
"sdpMlineIndex": init.SDPMLineIndex,
"target": "SUBSCRIBER",
"pcSeq": 1,
"target": "SUBSCRIBER",
"pcSeq": 1,
},
})
p.wsMu.Unlock()
@@ -484,11 +484,11 @@ func (p *Peer) setupICEHandlers() {
p.ws.WriteJSON(map[string]interface{}{
"uid": uuid.New().String(),
"webrtcIceCandidate": map[string]interface{}{
"candidate": init.Candidate,
"sdpMid": init.SDPMid,
"candidate": init.Candidate,
"sdpMid": init.SDPMid,
"sdpMlineIndex": init.SDPMLineIndex,
"target": "PUBLISHER",
"pcSeq": 1,
"target": "PUBLISHER",
"pcSeq": 1,
},
})
p.wsMu.Unlock()
@@ -498,17 +498,17 @@ func (p *Peer) setupICEHandlers() {
func (p *Peer) sendLeave() {
p.wsMu.Lock()
defer p.wsMu.Unlock()
if p.ws == nil {
log.Println("WebSocket already closed, cannot send leave")
return
}
leave := map[string]interface{}{
"uid": uuid.New().String(),
"leave": map[string]interface{}{},
}
if err := p.ws.WriteJSON(leave); err != nil {
log.Printf("Failed to send leave: %v", err)
} else {
@@ -518,14 +518,14 @@ func (p *Peer) sendLeave() {
func (p *Peer) Close() error {
log.Println("Closing peer connection...")
p.sendQueueClosed.Store(true)
log.Println("Sending leave message...")
p.sendLeave()
time.Sleep(1 * time.Second)
log.Println("Closing channels...")
if p.closeCh != nil {
select {
@@ -534,36 +534,36 @@ func (p *Peer) Close() error {
close(p.closeCh)
}
}
log.Println("Waiting for goroutines...")
done := make(chan struct{})
go func() {
p.wg.Wait()
close(done)
}()
select {
case <-done:
log.Println("All goroutines finished")
case <-time.After(2 * time.Second):
log.Println("Goroutine wait timeout")
}
if p.dc != nil {
log.Println("Closing DataChannel...")
p.dc.Close()
}
if p.pcPub != nil {
log.Println("Closing Publisher PeerConnection...")
p.pcPub.Close()
}
if p.pcSub != nil {
log.Println("Closing Subscriber PeerConnection...")
p.pcSub.Close()
}
if p.ws != nil {
log.Println("Closing WebSocket...")
p.wsMu.Lock()
@@ -571,7 +571,7 @@ func (p *Peer) Close() error {
p.ws.Close()
p.wsMu.Unlock()
}
log.Println("Peer closed")
return nil
}
@@ -579,7 +579,7 @@ func (p *Peer) Close() error {
func (p *Peer) keepAlive() {
wsPingTicker := time.NewTicker(30 * time.Second)
defer wsPingTicker.Stop()
appPingTicker := time.NewTicker(5 * time.Second)
defer appPingTicker.Stop()
@@ -626,49 +626,49 @@ func (p *Peer) keepAlive() {
func (p *Peer) reconnect(ctx context.Context) error {
log.Println("Reconnecting...")
p.sendLeave()
time.Sleep(500 * time.Millisecond)
close(p.keepAliveCh)
if p.dc != nil {
p.dc.Close()
}
if p.pcPub != nil {
p.pcPub.Close()
}
if p.pcSub != nil {
p.pcSub.Close()
}
if p.ws != nil {
p.wsMu.Lock()
p.ws.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(time.Second))
p.ws.Close()
p.wsMu.Unlock()
}
time.Sleep(3 * time.Second)
p.keepAliveCh = make(chan struct{})
conn, err := GetConnectionInfo(p.roomURL, p.name)
if err != nil {
return err
}
p.conn = conn
if err := p.Connect(ctx); err != nil {
return err
}
if p.onReconnect != nil {
p.onReconnect(p.dc)
}
return nil
}
@@ -679,7 +679,7 @@ func (p *Peer) SetReconnectCallback(cb func(*webrtc.DataChannel)) {
func (p *Peer) WatchConnection(ctx context.Context) {
const maxReconnects = 10
const reconnectWindow = 5 * time.Minute
for {
select {
case <-p.reconnectCh:
@@ -688,22 +688,22 @@ func (p *Peer) WatchConnection(ctx context.Context) {
if now.Sub(p.lastReconnect) > reconnectWindow {
p.reconnectCount = 0
}
if p.reconnectCount >= maxReconnects {
log.Printf("Max reconnect attempts (%d) reached, stopping", maxReconnects)
p.reconnectMu.Unlock()
return
}
p.reconnectCount++
p.lastReconnect = now
p.reconnectMu.Unlock()
backoff := time.Duration(p.reconnectCount) * 2 * time.Second
if backoff > 30*time.Second {
backoff = 30 * time.Second
}
for {
if err := p.reconnect(ctx); err != nil {
log.Printf("Reconnect failed: %v, retrying in %v...", err, backoff)
@@ -724,7 +724,7 @@ func (p *Peer) WatchConnection(ctx context.Context) {
func (p *Peer) processSendQueue(workerID int) {
log.Printf("[WORKER-%d] Started", workerID)
defer log.Printf("[WORKER-%d] Stopped", workerID)
for {
select {
case data := <-p.sendQueue:
@@ -733,7 +733,7 @@ func (p *Peer) processSendQueue(workerID int) {
}
// Wait until SCTP buffer drains. Dropping here would corrupt the
// carried TCP streams (the mux is a reliable transport) large
// carried TCP streams (the mux is a reliable transport); large
// downloads like Instagram/Twitter assets would hang forever
// waiting for the missing bytes. Backpressure already propagates
// upstream via CanSend() / the sendQueue length.
@@ -768,7 +768,7 @@ func (p *Peer) processSendQueue(workerID int) {
workerID, len(data), p.dc.BufferedAmount())
}
}
case <-p.closeCh:
return
}
@@ -778,7 +778,7 @@ func (p *Peer) processSendQueue(workerID int) {
func (p *Peer) monitorQueue() {
ticker := time.NewTicker(3 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
@@ -787,8 +787,8 @@ func (p *Peer) monitorQueue() {
if p.dc != nil {
buffered = p.dc.BufferedAmount()
}
if queueLen > 1000 || buffered > 3*1024*1024 {
log.Printf("[QUEUE_MONITOR] queue_len=%d dc_buffered=%d", queueLen, buffered)
if queueLen > 800 || buffered > 3*1024*1024 {
log.Printf("[QUEUE_MONITOR] queue_len=%d dc_buffered=%d MB", queueLen, buffered/(1024*1024))
}
case <-p.closeCh:
return
@@ -797,5 +797,10 @@ func (p *Peer) monitorQueue() {
}
func (p *Peer) CanSend() bool {
return len(p.sendQueue) < 3000
queueLen := len(p.sendQueue)
buffered := uint64(0)
if p.dc != nil {
buffered = p.dc.BufferedAmount()
}
return queueLen < 1000 && buffered < 3*1024*1024
}

View File

@@ -16,15 +16,21 @@ Project that allows users to bypass blocking by parasitizing and tunneling on un
## satus
pre-alpha
<br>
see all info in [issues](https://github.com/openlibrecommunity/olcrtc/issues)
<br>
issues? contact us at [@openlibrecommunity](https://t.me/openlibrecommunity)
<br>
or wait for the release or at least a beta
## fast start
```bash
# server
# server ( podman, pre configured, easy )
./srv.sh
# client
# client ( podman, pre configured, easy
./cnc.sh
# or native ( no podman ) linux

3
srv.sh
View File

@@ -1,5 +1,8 @@
#!/bin/bash
echo "ЕСЛИ У ВАС ЕСТЬ ПРОБЛЕМЫ - Я В КУРСЕ, ПРОЕКТ В БЕТЕ, ПО ПРОБЛЕМАМ В ЧАТ t.me/openlibrecommunity ИЛИ ВООБЩЕ НЕКУДА, ЖДИТЕ РЕЛИЗА"
set -e
CONTAINER_NAME="olcrtc-server"