Files
3x-ui/web/job/node_traffic_sync_job.go
MHSanaei 3af2da0142 fix(online): scope online status per node instead of a global union
The inbounds page and Nodes page checked each client's email against a
single deduped union of every node's online clients, so a client connected
to one node showed as online on every inbound across every node. The local
online set was also derived from the email-keyed client_traffics.last_online
column, which remote-node syncs bump too, leaking remote-only clients onto
local inbounds.

Track online clients per node: the local panel's own xray clients under key
0 (derived from live traffic-poll deltas via RefreshLocalOnline, kept in
memory and independent of the shared last_online column) and each remote
node under its id. Add GetOnlineClientsByNode plus a /clients/onlinesByNode
endpoint and onlineByNode WS field; node.go and the inbounds rollup now scope
online by node. The flat GetOnlineClients union is kept for client-centric and
total-count views (Clients page, dashboard, telegram).

Closes #4809
2026-06-02 18:33:21 +02:00

176 lines
4.3 KiB
Go

package job
import (
"context"
"sync"
"time"
"github.com/mhsanaei/3x-ui/v3/database/model"
"github.com/mhsanaei/3x-ui/v3/logger"
"github.com/mhsanaei/3x-ui/v3/web/runtime"
"github.com/mhsanaei/3x-ui/v3/web/service"
"github.com/mhsanaei/3x-ui/v3/web/websocket"
)
const (
nodeTrafficSyncConcurrency = 8
nodeTrafficSyncRequestTimeout = 4 * time.Second
)
type NodeTrafficSyncJob struct {
nodeService service.NodeService
inboundService service.InboundService
settingService service.SettingService
xrayService service.XrayService
running sync.Mutex
structural atomicBool
}
type atomicBool struct {
mu sync.Mutex
v bool
}
func (a *atomicBool) set() {
a.mu.Lock()
a.v = true
a.mu.Unlock()
}
func (a *atomicBool) takeAndReset() bool {
a.mu.Lock()
v := a.v
a.v = false
a.mu.Unlock()
return v
}
func NewNodeTrafficSyncJob() *NodeTrafficSyncJob {
return &NodeTrafficSyncJob{}
}
func (j *NodeTrafficSyncJob) Run() {
if !j.running.TryLock() {
return
}
defer j.running.Unlock()
mgr := runtime.GetManager()
if mgr == nil {
return
}
nodes, err := j.nodeService.GetAll()
if err != nil {
logger.Warning("node traffic sync: load nodes failed:", err)
return
}
if len(nodes) == 0 {
return
}
sem := make(chan struct{}, nodeTrafficSyncConcurrency)
var wg sync.WaitGroup
for _, n := range nodes {
if !n.Enable || n.Status != "online" {
continue
}
wg.Add(1)
sem <- struct{}{}
go func(n *model.Node) {
defer wg.Done()
defer func() { <-sem }()
j.syncOne(mgr, n)
}(n)
}
wg.Wait()
_, clientsDisabled, err := j.inboundService.AddTraffic(nil, nil)
if err != nil {
logger.Warning("node traffic sync: depletion check failed:", err)
}
if clientsDisabled {
if restartOnDisable, settingErr := j.settingService.GetRestartXrayOnClientDisable(); settingErr == nil && restartOnDisable {
if err := j.xrayService.RestartXray(true); err != nil {
logger.Warning("node traffic sync: restart xray after disabling clients failed:", err)
j.xrayService.SetToNeedRestart()
}
} else if settingErr != nil {
logger.Warning("node traffic sync: get RestartXrayOnClientDisable failed:", settingErr)
}
j.structural.set()
}
lastOnline, err := j.inboundService.GetClientsLastOnline()
if err != nil {
logger.Warning("node traffic sync: get last-online failed:", err)
}
if lastOnline == nil {
lastOnline = map[string]int64{}
}
// Prune stale local-online entries (no local active emails to add here —
// only the local xray poll feeds those) so a stopped local xray's clients
// still age out between traffic polls.
j.inboundService.RefreshLocalOnlineClients(nil)
if !websocket.HasClients() {
return
}
online := j.inboundService.GetOnlineClients()
if online == nil {
online = []string{}
}
websocket.BroadcastTraffic(map[string]any{
"onlineClients": online,
"onlineByNode": j.inboundService.GetOnlineClientsByNode(),
"lastOnlineMap": lastOnline,
})
clientStats := map[string]any{}
if stats, err := j.inboundService.GetAllClientTraffics(); err != nil {
logger.Warning("node traffic sync: get all client traffics for websocket failed:", err)
} else if len(stats) > 0 {
clientStats["clients"] = stats
}
if summary, err := j.inboundService.GetInboundsTrafficSummary(); err != nil {
logger.Warning("node traffic sync: get inbounds summary for websocket failed:", err)
} else if len(summary) > 0 {
clientStats["inbounds"] = summary
}
if len(clientStats) > 0 {
websocket.BroadcastClientStats(clientStats)
}
if j.structural.takeAndReset() {
websocket.BroadcastInvalidate(websocket.MessageTypeInbounds)
websocket.BroadcastInvalidate(websocket.MessageTypeClients)
}
}
func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node) {
ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout)
defer cancel()
rt, err := mgr.RemoteFor(n)
if err != nil {
logger.Warning("node traffic sync: remote lookup failed for", n.Name, ":", err)
return
}
snap, err := rt.FetchTrafficSnapshot(ctx)
if err != nil {
logger.Warning("node traffic sync: fetch from", n.Name, "failed:", err)
j.inboundService.ClearNodeOnlineClients(n.Id)
return
}
changed, err := j.inboundService.SetRemoteTraffic(n.Id, snap)
if err != nil {
logger.Warning("node traffic sync: merge for", n.Name, "failed:", err)
return
}
if changed {
j.structural.set()
}
}