diff --git a/cnc.sh b/cnc.sh index 04a5e8f..1090b17 100755 --- a/cnc.sh +++ b/cnc.sh @@ -1,5 +1,8 @@ #!/bin/bash +echo "ЕСЛИ У ВАС ЕСТЬ ПРОБЛЕМЫ - Я В КУРСЕ, ПРОЕКТ В БЕТЕ, ПО ПРОБЛЕМАМ В ЧАТ t.me/openlibrecommunity ИЛИ ВООБЩЕ НЕКУДА, ЖДИТЕ РЕЛИЗА" + + set -e CONTAINER_NAME="olcrtc-client" diff --git a/internal/client/client.go b/internal/client/client.go index 9ddf8a3..8188fc0 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -166,7 +166,6 @@ func (c *Client) onData(data []byte) { return } - logger.Verbose("Received %d bytes from server", len(plaintext)) c.mux.HandleFrame(plaintext) } @@ -208,7 +207,6 @@ func (c *Client) runSOCKS5(ctx context.Context, port int, username, password str func (c *Client) handleSOCKS5(conn net.Conn, username, password string) { defer conn.Close() - startTime := time.Now() buf := make([]byte, 513) @@ -316,36 +314,20 @@ func (c *Client) handleSOCKS5(conn net.Conn, username, password string) { } reqData, _ := json.Marshal(req) - sendTime := time.Now() - - queueLen := 0 - buffered := uint64(0) - for _, peer := range c.peers { - if peer != nil { - queueLen += len(peer.GetSendQueue()) - buffered += peer.GetBufferedAmount() - } - } - c.mux.SendData(sid, reqData) - log.Printf("[CLIENT] sid=%d SEND_REQUEST elapsed=%v queue_len=%d dc_buffered=%d", - sid, time.Since(sendTime), queueLen, buffered) dataReady := c.mux.WaitForData(sid) timeout := time.NewTimer(10 * time.Second) defer timeout.Stop() - waitStart := time.Now() select { case <-dataReady: - log.Printf("[CLIENT] sid=%d RESPONSE_RECEIVED wait_time=%v total_elapsed=%v", sid, time.Since(waitStart), time.Since(startTime)) stream := c.mux.GetStream(sid) if stream == nil || len(stream.RecvBuf()) == 0 { conn.Write([]byte{5, 4, 0, 1, 0, 0, 0, 0, 0, 0}) return } case <-timeout.C: - log.Printf("[CLIENT] sid=%d TIMEOUT after wait_time=%v total_elapsed=%v", sid, time.Since(waitStart), time.Since(startTime)) conn.Write([]byte{5, 4, 0, 1, 0, 0, 0, 0, 0, 0}) return } @@ -353,7 +335,6 @@ func (c *Client) handleSOCKS5(conn net.Conn, username, password string) { c.mux.ReadStream(sid) conn.Write([]byte{5, 0, 0, 1, 0, 0, 0, 0, 0, 0}) - log.Printf("[CLIENT] sid=%d SOCKS5_READY total_elapsed=%v", sid, time.Since(startTime)) done := make(chan struct{}) streamClosed := make(chan struct{}) @@ -377,20 +358,22 @@ func (c *Client) handleSOCKS5(conn net.Conn, username, password string) { defer close(streamClosed) defer c.mux.CleanupDataChannel(sid) + ticker := time.NewTicker(10 * time.Millisecond) + defer ticker.Stop() + for { - dataReady := c.mux.WaitForData(sid) - select { case <-done: return - case <-dataReady: - for { - data := c.mux.ReadStream(sid) - if len(data) == 0 { - break - } - if _, err := conn.Write(data); err != nil { - return + case <-ticker.C: + data := c.mux.ReadStream(sid) + if len(data) > 0 { + for len(data) > 0 { + n, err := conn.Write(data) + if err != nil { + return + } + data = data[n:] } } diff --git a/internal/mux/mux.go b/internal/mux/mux.go index c0de57a..5f25fb7 100644 --- a/internal/mux/mux.go +++ b/internal/mux/mux.go @@ -65,7 +65,7 @@ func (m *Multiplexer) OpenStream() uint16 { if m.nextID == 0 { m.nextID = 1 } - + if _, exists := m.streams[sid]; !exists { m.streams[sid] = &Stream{ ID: sid, @@ -84,13 +84,16 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error { m.mu.RUnlock() if !exists || stream.closed { - logger.Debug("SendData: stream %d not exists or closed", sid) return nil } - logger.Verbose("SendData: sid=%d, size=%d bytes", sid, len(data)) + const chunkSize = 7168 + totalChunks := (len(data) + chunkSize - 1) / chunkSize + + if totalChunks > 10 { + logger.Debug("SendData: sid=%d, size=%d bytes, chunks=%d", sid, len(data), totalChunks) + } - const chunkSize = 4096 for i := 0; i < len(data); i += chunkSize { end := i + chunkSize if end > len(data) { @@ -98,12 +101,12 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error { } chunk := data[i:end] - + m.sendSeqMu.Lock() seq := m.sendSeq[sid] m.sendSeq[sid]++ m.sendSeqMu.Unlock() - + frame := make([]byte, 12+len(chunk)) binary.BigEndian.PutUint32(frame[0:4], m.clientID) binary.BigEndian.PutUint16(frame[4:6], sid) @@ -112,7 +115,6 @@ func (m *Multiplexer) SendData(sid uint16, data []byte) error { copy(frame[12:], chunk) if err := m.onSend(frame); err != nil { - logger.Debug("[MUX] sid=%d onSend error: %v", sid, err) return err } } @@ -127,7 +129,7 @@ func (m *Multiplexer) CloseStream(sid uint16) error { if stream, exists := m.streams[sid]; exists { stream.closed = true } - + m.sendSeqMu.Lock() delete(m.sendSeq, sid) m.sendSeqMu.Unlock() @@ -147,7 +149,7 @@ func (m *Multiplexer) HandleFrame(frame []byte) { clientID := binary.BigEndian.Uint32(frame[0:4]) sid := binary.BigEndian.Uint16(frame[4:6]) length := binary.BigEndian.Uint16(frame[6:8]) - + if sid == 0xFFFF && length == 0xFFFF { m.mu.Lock() for streamSid, stream := range m.streams { @@ -166,8 +168,6 @@ func (m *Multiplexer) HandleFrame(frame []byte) { sid := binary.BigEndian.Uint16(frame[4:6]) length := binary.BigEndian.Uint16(frame[6:8]) seq := binary.BigEndian.Uint32(frame[8:12]) - - logger.Verbose("[MUX] HandleFrame sid=%d len=%d seq=%d", sid, length, seq) if sid == 0xFFFF && length == 0xFFFF { m.mu.Lock() @@ -198,7 +198,7 @@ func (m *Multiplexer) HandleFrame(frame []byte) { m.mu.Lock() defer m.mu.Unlock() - + stream, exists := m.streams[sid] if !exists { if len(m.streams) >= m.maxStreams { @@ -219,7 +219,7 @@ func (m *Multiplexer) HandleFrame(frame []byte) { stream.nextSeq = 0 stream.outOfOrder = make(map[uint32][]byte) } - + if seq == stream.nextSeq { // Backpressure: if the stream buffer is full, release the mux lock and // wait for the reader to drain it. Dropping/closing here would corrupt @@ -253,7 +253,7 @@ func (m *Multiplexer) HandleFrame(frame []byte) { stream.nextSeq++ logger.Verbose("Applied out-of-order packet sid=%d seq=%d", sid, stream.nextSeq-1) } - + m.dataReadyMu.Lock() if ch, ok := m.dataReady[sid]; ok { select { @@ -265,10 +265,7 @@ func (m *Multiplexer) HandleFrame(frame []byte) { } else if seq > stream.nextSeq { if len(stream.outOfOrder) < 100 { stream.outOfOrder[seq] = append([]byte(nil), data...) - logger.Verbose("Buffered out-of-order packet sid=%d seq=%d (expected %d)", sid, seq, stream.nextSeq) } - } else { - logger.Verbose("Dropped duplicate packet sid=%d seq=%d (expected %d)", sid, seq, stream.nextSeq) } } @@ -337,10 +334,10 @@ func (m *Multiplexer) Reset() { for _, stream := range m.streams { stream.closed = true } - + m.streams = make(map[uint16]*Stream) m.nextID = 1 - + m.sendSeqMu.Lock() m.sendSeq = make(map[uint16]uint32) m.sendSeqMu.Unlock() @@ -349,14 +346,14 @@ func (m *Multiplexer) Reset() { func (m *Multiplexer) UpdateSendFunc(onSend func([]byte) error) { m.mu.Lock() defer m.mu.Unlock() - + m.onSend = onSend } func (m *Multiplexer) WaitForData(sid uint16) <-chan struct{} { m.dataReadyMu.Lock() defer m.dataReadyMu.Unlock() - + if _, ok := m.dataReady[sid]; !ok { m.dataReady[sid] = make(chan struct{}, 1) } @@ -366,7 +363,7 @@ func (m *Multiplexer) WaitForData(sid uint16) <-chan struct{} { func (m *Multiplexer) CleanupDataChannel(sid uint16) { m.dataReadyMu.Lock() defer m.dataReadyMu.Unlock() - + if ch, ok := m.dataReady[sid]; ok { close(ch) delete(m.dataReady, sid) diff --git a/internal/server/server.go b/internal/server/server.go index 3dcfbd6..75efcc4 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -182,8 +182,6 @@ func (s *Server) onData(data []byte) { return } - logger.Verbose("Received %d bytes from client", len(plaintext)) - if len(plaintext) >= 12 { clientID := binary.BigEndian.Uint32(plaintext[0:4]) sid := binary.BigEndian.Uint16(plaintext[4:6]) @@ -261,7 +259,6 @@ func (s *Server) run(ctx context.Context) error { go func(sid uint16) { data := s.mux.ReadStream(sid) if len(data) > 0 { - log.Printf("[SERVER] sid=%d READ_STREAM size=%d", sid, len(data)) s.connMu.RLock() conn, exists := s.connections[sid] s.connMu.RUnlock() @@ -342,9 +339,7 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) { log.Printf("[SERVER] sid=%d CONNECT_SUCCESS dial_time=%v", sid, dialElapsed) - sendStart := time.Now() s.mux.SendData(sid, []byte{0x00}) - log.Printf("[SERVER] sid=%d RESPONSE_SENT send_time=%v total_elapsed=%v", sid, time.Since(sendStart), time.Since(startTime)) go func() { defer func() { @@ -355,9 +350,15 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) { }() buf := make([]byte, 16384) + totalSent := uint64(0) + lastLog := time.Now() + for { n, err := conn.Read(buf) if err != nil { + if totalSent > 1024*1024 { + log.Printf("[SERVER] sid=%d TRANSFER_COMPLETE total=%d MB", sid, totalSent/(1024*1024)) + } return } @@ -368,6 +369,12 @@ func (s *Server) handleConnect(sid uint16, req ConnectRequest) { if err := s.mux.SendData(sid, buf[:n]); err != nil { return } + + totalSent += uint64(n) + if time.Since(lastLog) > 5*time.Second { + log.Printf("[SERVER] sid=%d TRANSFER_PROGRESS sent=%d MB", sid, totalSent/(1024*1024)) + lastLog = time.Now() + } } }() } diff --git a/internal/telemost/peer.go b/internal/telemost/peer.go index a8c8418..633e003 100644 --- a/internal/telemost/peer.go +++ b/internal/telemost/peer.go @@ -86,7 +86,7 @@ func (p *Peer) Connect(ctx context.Context) error { if err != nil { return err } - + p.pcSub.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { log.Printf("Subscriber PeerConnection state: %s", state.String()) if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected { @@ -101,7 +101,7 @@ func (p *Peer) Connect(ctx context.Context) error { if err != nil { return err } - + p.pcPub.OnConnectionStateChange(func(state webrtc.PeerConnectionState) { log.Printf("Publisher PeerConnection state: %s", state.String()) if state == webrtc.PeerConnectionStateFailed || state == webrtc.PeerConnectionStateDisconnected { @@ -120,7 +120,7 @@ func (p *Peer) Connect(ctx context.Context) error { dcReady := make(chan struct{}) p.dc.OnOpen(func() { log.Println("DataChannel opened") - + numWorkers := 4 for i := 0; i < numWorkers; i++ { p.wg.Add(1) @@ -129,16 +129,16 @@ func (p *Peer) Connect(ctx context.Context) error { p.processSendQueue(workerID) }(i) } - + p.wg.Add(1) go func() { defer p.wg.Done() p.monitorQueue() }() - + close(dcReady) }) - + p.dc.OnClose(func() { log.Println("DataChannel closed") if p.onReconnect != nil { @@ -187,7 +187,7 @@ func (p *Peer) Connect(ctx context.Context) error { ws.SetReadDeadline(time.Now().Add(60 * time.Second)) return nil }) - + ws.SetReadDeadline(time.Now().Add(60 * time.Second)) p.wg.Add(1) @@ -222,11 +222,11 @@ func (p *Peer) Send(data []byte) error { if p.dc == nil || p.dc.ReadyState() != webrtc.DataChannelStateOpen { return fmt.Errorf("datachannel not ready") } - + if p.sendQueueClosed.Load() { return fmt.Errorf("send queue closed") } - + select { case p.sendQueue <- data: return nil @@ -251,13 +251,13 @@ func (p *Peer) sendHello() error { "name": p.name, "role": "SPEAKER", }, - "sendAudio": false, - "sendVideo": false, - "sendSharing": false, - "participantId": p.conn.PeerID, - "roomId": p.conn.RoomID, - "serviceName": "telemost", - "credentials": p.conn.Credentials, + "sendAudio": false, + "sendVideo": false, + "sendSharing": false, + "participantId": p.conn.PeerID, + "roomId": p.conn.RoomID, + "serviceName": "telemost", + "credentials": p.conn.Credentials, "capabilitiesOffer": map[string]interface{}{ "offerAnswerMode": []string{"SEPARATE"}, "initialSubscriberOffer": []string{"ON_HELLO"}, @@ -295,7 +295,7 @@ func (p *Peer) handleSignaling() { } return } - + p.wsMu.Lock() if p.ws != nil { p.ws.SetReadDeadline(time.Now().Add(60 * time.Second)) @@ -320,7 +320,7 @@ func (p *Peer) handleSignaling() { p.sendPong(uid) continue } - + if _, ok := msg["pong"]; ok { p.sendAck(uid) continue @@ -432,7 +432,7 @@ func (p *Peer) handleICE(cand map[string]interface{}) { func (p *Peer) sendAck(uid string) { p.wsMu.Lock() defer p.wsMu.Unlock() - + p.ws.WriteJSON(map[string]interface{}{ "uid": uid, "ack": map[string]interface{}{ @@ -446,9 +446,9 @@ func (p *Peer) sendAck(uid string) { func (p *Peer) sendPong(uid string) { p.wsMu.Lock() defer p.wsMu.Unlock() - + p.ws.WriteJSON(map[string]interface{}{ - "uid": uid, + "uid": uid, "pong": map[string]interface{}{}, }) } @@ -464,11 +464,11 @@ func (p *Peer) setupICEHandlers() { p.ws.WriteJSON(map[string]interface{}{ "uid": uuid.New().String(), "webrtcIceCandidate": map[string]interface{}{ - "candidate": init.Candidate, - "sdpMid": init.SDPMid, + "candidate": init.Candidate, + "sdpMid": init.SDPMid, "sdpMlineIndex": init.SDPMLineIndex, - "target": "SUBSCRIBER", - "pcSeq": 1, + "target": "SUBSCRIBER", + "pcSeq": 1, }, }) p.wsMu.Unlock() @@ -484,11 +484,11 @@ func (p *Peer) setupICEHandlers() { p.ws.WriteJSON(map[string]interface{}{ "uid": uuid.New().String(), "webrtcIceCandidate": map[string]interface{}{ - "candidate": init.Candidate, - "sdpMid": init.SDPMid, + "candidate": init.Candidate, + "sdpMid": init.SDPMid, "sdpMlineIndex": init.SDPMLineIndex, - "target": "PUBLISHER", - "pcSeq": 1, + "target": "PUBLISHER", + "pcSeq": 1, }, }) p.wsMu.Unlock() @@ -498,17 +498,17 @@ func (p *Peer) setupICEHandlers() { func (p *Peer) sendLeave() { p.wsMu.Lock() defer p.wsMu.Unlock() - + if p.ws == nil { log.Println("WebSocket already closed, cannot send leave") return } - + leave := map[string]interface{}{ "uid": uuid.New().String(), "leave": map[string]interface{}{}, } - + if err := p.ws.WriteJSON(leave); err != nil { log.Printf("Failed to send leave: %v", err) } else { @@ -518,14 +518,14 @@ func (p *Peer) sendLeave() { func (p *Peer) Close() error { log.Println("Closing peer connection...") - + p.sendQueueClosed.Store(true) - + log.Println("Sending leave message...") p.sendLeave() - + time.Sleep(1 * time.Second) - + log.Println("Closing channels...") if p.closeCh != nil { select { @@ -534,36 +534,36 @@ func (p *Peer) Close() error { close(p.closeCh) } } - + log.Println("Waiting for goroutines...") done := make(chan struct{}) go func() { p.wg.Wait() close(done) }() - + select { case <-done: log.Println("All goroutines finished") case <-time.After(2 * time.Second): log.Println("Goroutine wait timeout") } - + if p.dc != nil { log.Println("Closing DataChannel...") p.dc.Close() } - + if p.pcPub != nil { log.Println("Closing Publisher PeerConnection...") p.pcPub.Close() } - + if p.pcSub != nil { log.Println("Closing Subscriber PeerConnection...") p.pcSub.Close() } - + if p.ws != nil { log.Println("Closing WebSocket...") p.wsMu.Lock() @@ -571,7 +571,7 @@ func (p *Peer) Close() error { p.ws.Close() p.wsMu.Unlock() } - + log.Println("Peer closed") return nil } @@ -579,7 +579,7 @@ func (p *Peer) Close() error { func (p *Peer) keepAlive() { wsPingTicker := time.NewTicker(30 * time.Second) defer wsPingTicker.Stop() - + appPingTicker := time.NewTicker(5 * time.Second) defer appPingTicker.Stop() @@ -626,49 +626,49 @@ func (p *Peer) keepAlive() { func (p *Peer) reconnect(ctx context.Context) error { log.Println("Reconnecting...") - + p.sendLeave() time.Sleep(500 * time.Millisecond) - + close(p.keepAliveCh) - + if p.dc != nil { p.dc.Close() } - + if p.pcPub != nil { p.pcPub.Close() } - + if p.pcSub != nil { p.pcSub.Close() } - + if p.ws != nil { p.wsMu.Lock() p.ws.WriteControl(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""), time.Now().Add(time.Second)) p.ws.Close() p.wsMu.Unlock() } - + time.Sleep(3 * time.Second) - + p.keepAliveCh = make(chan struct{}) - + conn, err := GetConnectionInfo(p.roomURL, p.name) if err != nil { return err } p.conn = conn - + if err := p.Connect(ctx); err != nil { return err } - + if p.onReconnect != nil { p.onReconnect(p.dc) } - + return nil } @@ -679,7 +679,7 @@ func (p *Peer) SetReconnectCallback(cb func(*webrtc.DataChannel)) { func (p *Peer) WatchConnection(ctx context.Context) { const maxReconnects = 10 const reconnectWindow = 5 * time.Minute - + for { select { case <-p.reconnectCh: @@ -688,22 +688,22 @@ func (p *Peer) WatchConnection(ctx context.Context) { if now.Sub(p.lastReconnect) > reconnectWindow { p.reconnectCount = 0 } - + if p.reconnectCount >= maxReconnects { log.Printf("Max reconnect attempts (%d) reached, stopping", maxReconnects) p.reconnectMu.Unlock() return } - + p.reconnectCount++ p.lastReconnect = now p.reconnectMu.Unlock() - + backoff := time.Duration(p.reconnectCount) * 2 * time.Second if backoff > 30*time.Second { backoff = 30 * time.Second } - + for { if err := p.reconnect(ctx); err != nil { log.Printf("Reconnect failed: %v, retrying in %v...", err, backoff) @@ -724,7 +724,7 @@ func (p *Peer) WatchConnection(ctx context.Context) { func (p *Peer) processSendQueue(workerID int) { log.Printf("[WORKER-%d] Started", workerID) defer log.Printf("[WORKER-%d] Stopped", workerID) - + for { select { case data := <-p.sendQueue: @@ -733,7 +733,7 @@ func (p *Peer) processSendQueue(workerID int) { } // Wait until SCTP buffer drains. Dropping here would corrupt the - // carried TCP streams (the mux is a reliable transport) — large + // carried TCP streams (the mux is a reliable transport); large // downloads like Instagram/Twitter assets would hang forever // waiting for the missing bytes. Backpressure already propagates // upstream via CanSend() / the sendQueue length. @@ -768,7 +768,7 @@ func (p *Peer) processSendQueue(workerID int) { workerID, len(data), p.dc.BufferedAmount()) } } - + case <-p.closeCh: return } @@ -778,7 +778,7 @@ func (p *Peer) processSendQueue(workerID int) { func (p *Peer) monitorQueue() { ticker := time.NewTicker(3 * time.Second) defer ticker.Stop() - + for { select { case <-ticker.C: @@ -787,8 +787,8 @@ func (p *Peer) monitorQueue() { if p.dc != nil { buffered = p.dc.BufferedAmount() } - if queueLen > 1000 || buffered > 3*1024*1024 { - log.Printf("[QUEUE_MONITOR] queue_len=%d dc_buffered=%d", queueLen, buffered) + if queueLen > 800 || buffered > 3*1024*1024 { + log.Printf("[QUEUE_MONITOR] queue_len=%d dc_buffered=%d MB", queueLen, buffered/(1024*1024)) } case <-p.closeCh: return @@ -797,5 +797,10 @@ func (p *Peer) monitorQueue() { } func (p *Peer) CanSend() bool { - return len(p.sendQueue) < 3000 + queueLen := len(p.sendQueue) + buffered := uint64(0) + if p.dc != nil { + buffered = p.dc.BufferedAmount() + } + return queueLen < 1000 && buffered < 3*1024*1024 } diff --git a/readme.md b/readme.md index 789461e..1dd9786 100644 --- a/readme.md +++ b/readme.md @@ -16,15 +16,21 @@ Project that allows users to bypass blocking by parasitizing and tunneling on un ## satus pre-alpha +
see all info in [issues](https://github.com/openlibrecommunity/olcrtc/issues) +
+issues? contact us at [@openlibrecommunity](https://t.me/openlibrecommunity) +
+or wait for the release or at least a beta + ## fast start ```bash -# server +# server ( podman, pre configured, easy ) ./srv.sh -# client +# client ( podman, pre configured, easy ./cnc.sh # or native ( no podman ) linux diff --git a/srv.sh b/srv.sh index 53da763..baaa6cc 100755 --- a/srv.sh +++ b/srv.sh @@ -1,5 +1,8 @@ #!/bin/bash + +echo "ЕСЛИ У ВАС ЕСТЬ ПРОБЛЕМЫ - Я В КУРСЕ, ПРОЕКТ В БЕТЕ, ПО ПРОБЛЕМАМ В ЧАТ t.me/openlibrecommunity ИЛИ ВООБЩЕ НЕКУДА, ЖДИТЕ РЕЛИЗА" + set -e CONTAINER_NAME="olcrtc-server"