mirror of
https://github.com/MHSanaei/3x-ui.git
synced 2026-06-03 02:49:36 +00:00
fix(online): scope online status per node instead of a global union
The inbounds page and Nodes page checked each client's email against a single deduped union of every node's online clients, so a client connected to one node showed as online on every inbound across every node. The local online set was also derived from the email-keyed client_traffics.last_online column, which remote-node syncs bump too, leaking remote-only clients onto local inbounds. Track online clients per node: the local panel's own xray clients under key 0 (derived from live traffic-poll deltas via RefreshLocalOnline, kept in memory and independent of the shared last_online column) and each remote node under its id. Add GetOnlineClientsByNode plus a /clients/onlinesByNode endpoint and onlineByNode WS field; node.go and the inbounds rollup now scope online by node. The flat GetOnlineClients union is kept for client-centric and total-count views (Clients page, dashboard, telegram). Closes #4809
This commit is contained in:
@@ -3617,7 +3617,7 @@
|
||||
"tags": [
|
||||
"Clients"
|
||||
],
|
||||
"summary": "List the emails of currently connected clients (last seen within the heartbeat window).",
|
||||
"summary": "List the emails of currently connected clients (last seen within the heartbeat window), deduped across every node.",
|
||||
"operationId": "post_panel_api_clients_onlines",
|
||||
"responses": {
|
||||
"200": {
|
||||
@@ -3649,6 +3649,48 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"/panel/api/clients/onlinesByNode": {
|
||||
"post": {
|
||||
"tags": [
|
||||
"Clients"
|
||||
],
|
||||
"summary": "Online client emails grouped by the node that reported them. The local panel uses key \"0\"; each remote node uses its node id. Lets the inbounds page show online status per node instead of merging every node together.",
|
||||
"operationId": "post_panel_api_clients_onlinesByNode",
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "Successful response",
|
||||
"content": {
|
||||
"application/json": {
|
||||
"schema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"success": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"msg": {
|
||||
"type": "string"
|
||||
},
|
||||
"obj": {}
|
||||
}
|
||||
},
|
||||
"example": {
|
||||
"success": true,
|
||||
"obj": {
|
||||
"0": [
|
||||
"user1"
|
||||
],
|
||||
"3": [
|
||||
"user1",
|
||||
"user2"
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/panel/api/clients/lastOnline": {
|
||||
"post": {
|
||||
"tags": [
|
||||
|
||||
@@ -21,6 +21,7 @@ export const keys = {
|
||||
list: (params: unknown) => ['clients', 'list', params] as const,
|
||||
all: () => ['clients', 'all'] as const,
|
||||
onlines: () => ['clients', 'onlines'] as const,
|
||||
onlinesByNode: () => ['clients', 'onlinesByNode'] as const,
|
||||
lastOnline: () => ['clients', 'lastOnline'] as const,
|
||||
groups: () => ['clients', 'groups'] as const,
|
||||
},
|
||||
|
||||
@@ -666,9 +666,15 @@ export const sections: readonly Section[] = [
|
||||
{
|
||||
method: 'POST',
|
||||
path: '/panel/api/clients/onlines',
|
||||
summary: 'List the emails of currently connected clients (last seen within the heartbeat window).',
|
||||
summary: 'List the emails of currently connected clients (last seen within the heartbeat window), deduped across every node.',
|
||||
response: '{\n "success": true,\n "obj": ["user1", "user2"]\n}',
|
||||
},
|
||||
{
|
||||
method: 'POST',
|
||||
path: '/panel/api/clients/onlinesByNode',
|
||||
summary: 'Online client emails grouped by the node that reported them. The local panel uses key "0"; each remote node uses its node id. Lets the inbounds page show online status per node instead of merging every node together.',
|
||||
response: '{\n "success": true,\n "obj": {\n "0": ["user1"],\n "3": ["user1", "user2"]\n }\n}',
|
||||
},
|
||||
{
|
||||
method: 'POST',
|
||||
path: '/panel/api/clients/lastOnline',
|
||||
|
||||
@@ -9,7 +9,7 @@ import { isSSMultiUser } from '@/lib/xray/protocol-capabilities';
|
||||
import { setDatepicker } from '@/hooks/useDatepicker';
|
||||
import { keys } from '@/api/queryKeys';
|
||||
import { SlimInboundListSchema, LastOnlineMapSchema, InboundDetailSchema } from '@/schemas/inbound';
|
||||
import { OnlinesSchema } from '@/schemas/client';
|
||||
import { OnlinesSchema, OnlineByNodeSchema } from '@/schemas/client';
|
||||
import { DefaultsPayloadSchema, type DefaultsPayload } from '@/schemas/defaults';
|
||||
|
||||
export interface SubSettings {
|
||||
@@ -54,6 +54,25 @@ async function fetchOnlineClients(): Promise<string[]> {
|
||||
return Array.isArray(validated.obj) ? validated.obj : [];
|
||||
}
|
||||
|
||||
// Online emails grouped by node id (local panel = key 0), used to scope the
|
||||
// per-inbound online rollup so a client online on one node is not shown
|
||||
// online on every node's inbounds.
|
||||
async function fetchOnlineClientsByNode(): Promise<Record<string, string[]>> {
|
||||
const msg = await HttpUtil.post('/panel/api/clients/onlinesByNode', undefined, { silent: true });
|
||||
if (!msg?.success) throw new Error(msg?.msg || 'Failed to fetch onlinesByNode');
|
||||
const validated = parseMsg(msg, OnlineByNodeSchema, 'clients/onlinesByNode');
|
||||
return (validated.obj && typeof validated.obj === 'object') ? (validated.obj as Record<string, string[]>) : {};
|
||||
}
|
||||
|
||||
function toNodeOnlineMap(data: Record<string, string[]>): Map<number, Set<string>> {
|
||||
const map = new Map<number, Set<string>>();
|
||||
for (const [key, emails] of Object.entries(data)) {
|
||||
if (!Array.isArray(emails)) continue;
|
||||
map.set(Number(key), new Set(emails));
|
||||
}
|
||||
return map;
|
||||
}
|
||||
|
||||
async function fetchLastOnlineMap(): Promise<Record<string, number>> {
|
||||
const msg = await HttpUtil.post('/panel/api/clients/lastOnline', undefined, { silent: true });
|
||||
if (!msg?.success) throw new Error(msg?.msg || 'Failed to fetch lastOnline');
|
||||
@@ -83,6 +102,12 @@ export function useInbounds() {
|
||||
staleTime: Infinity,
|
||||
});
|
||||
|
||||
const onlinesByNodeQuery = useQuery({
|
||||
queryKey: keys.clients.onlinesByNode(),
|
||||
queryFn: fetchOnlineClientsByNode,
|
||||
staleTime: Infinity,
|
||||
});
|
||||
|
||||
const lastOnlineQuery = useQuery({
|
||||
queryKey: keys.clients.lastOnline(),
|
||||
queryFn: fetchLastOnlineMap,
|
||||
@@ -135,6 +160,10 @@ export function useInbounds() {
|
||||
const onlineClientsRef = useRef<string[]>([]);
|
||||
onlineClientsRef.current = onlineClients;
|
||||
|
||||
// Online emails keyed by node id (local inbounds = key 0). The rollup
|
||||
// reads this so each inbound only counts clients online on its own node.
|
||||
const onlineByNodeRef = useRef<Map<number, Set<string>>>(new Map());
|
||||
|
||||
const [lastOnlineMap, setLastOnlineMap] = useState<Record<string, number>>({});
|
||||
|
||||
const rollupClients = useCallback(
|
||||
@@ -151,12 +180,14 @@ export function useInbounds() {
|
||||
const comments = new Map<string, string>();
|
||||
const now = Date.now();
|
||||
|
||||
const nodeOnline = onlineByNodeRef.current.get(dbInbound.nodeId ?? 0);
|
||||
|
||||
if (dbInbound.enable) {
|
||||
for (const client of clients) {
|
||||
if (client.comment && client.email) comments.set(client.email, client.comment);
|
||||
if (client.enable) {
|
||||
if (client.email) active.push(client.email);
|
||||
if (client.email && onlineClientsRef.current.includes(client.email)) online.push(client.email);
|
||||
if (client.email && nodeOnline?.has(client.email)) online.push(client.email);
|
||||
} else if (client.email) {
|
||||
deactive.push(client.email);
|
||||
}
|
||||
@@ -237,6 +268,13 @@ export function useInbounds() {
|
||||
}
|
||||
}, [onlinesQuery.data]);
|
||||
|
||||
useEffect(() => {
|
||||
if (onlinesByNodeQuery.data) {
|
||||
onlineByNodeRef.current = toNodeOnlineMap(onlinesByNodeQuery.data);
|
||||
rebuildClientCount();
|
||||
}
|
||||
}, [onlinesByNodeQuery.data, rebuildClientCount]);
|
||||
|
||||
useEffect(() => {
|
||||
if (lastOnlineQuery.data) setLastOnlineMap(lastOnlineQuery.data);
|
||||
}, [lastOnlineQuery.data]);
|
||||
@@ -255,6 +293,7 @@ export function useInbounds() {
|
||||
await Promise.all([
|
||||
queryClient.invalidateQueries({ queryKey: keys.inbounds.root() }),
|
||||
queryClient.invalidateQueries({ queryKey: keys.clients.onlines() }),
|
||||
queryClient.invalidateQueries({ queryKey: keys.clients.onlinesByNode() }),
|
||||
queryClient.invalidateQueries({ queryKey: keys.clients.lastOnline() }),
|
||||
queryClient.invalidateQueries({ queryKey: keys.xray.config() }),
|
||||
]);
|
||||
@@ -284,11 +323,14 @@ export function useInbounds() {
|
||||
const applyTrafficEvent = useCallback(
|
||||
(payload: unknown) => {
|
||||
if (!payload || typeof payload !== 'object') return;
|
||||
const p = payload as { onlineClients?: string[]; lastOnlineMap?: Record<string, number> };
|
||||
const p = payload as { onlineClients?: string[]; onlineByNode?: Record<string, string[]>; lastOnlineMap?: Record<string, number> };
|
||||
if (Array.isArray(p.onlineClients)) {
|
||||
onlineClientsRef.current = p.onlineClients;
|
||||
setOnlineClients(p.onlineClients);
|
||||
}
|
||||
if (p.onlineByNode && typeof p.onlineByNode === 'object') {
|
||||
onlineByNodeRef.current = toNodeOnlineMap(p.onlineByNode);
|
||||
}
|
||||
if (p.lastOnlineMap && typeof p.lastOnlineMap === 'object') {
|
||||
setLastOnlineMap((prev) => ({ ...prev, ...p.lastOnlineMap! }));
|
||||
}
|
||||
|
||||
@@ -112,6 +112,11 @@ export const BulkDetachResultSchema = z.object({
|
||||
|
||||
export const OnlinesSchema = nullableStringArray;
|
||||
|
||||
export const OnlineByNodeSchema = z
|
||||
.record(z.string(), nullableStringArray)
|
||||
.nullable()
|
||||
.transform((v) => v ?? {});
|
||||
|
||||
export const GroupSummarySchema = z.object({
|
||||
name: z.string(),
|
||||
clientCount: z.number(),
|
||||
|
||||
@@ -55,6 +55,7 @@ func (a *ClientController) initRouter(g *gin.RouterGroup) {
|
||||
g.POST("/ips/:email", a.getIps)
|
||||
g.POST("/clearIps/:email", a.clearIps)
|
||||
g.POST("/onlines", a.onlines)
|
||||
g.POST("/onlinesByNode", a.onlinesByNode)
|
||||
g.POST("/lastOnline", a.lastOnline)
|
||||
}
|
||||
|
||||
@@ -397,6 +398,10 @@ func (a *ClientController) onlines(c *gin.Context) {
|
||||
jsonObj(c, a.inboundService.GetOnlineClients(), nil)
|
||||
}
|
||||
|
||||
func (a *ClientController) onlinesByNode(c *gin.Context) {
|
||||
jsonObj(c, a.inboundService.GetOnlineClientsByNode(), nil)
|
||||
}
|
||||
|
||||
func (a *ClientController) lastOnline(c *gin.Context) {
|
||||
data, err := a.inboundService.GetClientsLastOnline()
|
||||
jsonObj(c, data, err)
|
||||
|
||||
@@ -109,7 +109,10 @@ func (j *NodeTrafficSyncJob) Run() {
|
||||
lastOnline = map[string]int64{}
|
||||
}
|
||||
|
||||
j.inboundService.RefreshOnlineClientsFromMap(lastOnline)
|
||||
// Prune stale local-online entries (no local active emails to add here —
|
||||
// only the local xray poll feeds those) so a stopped local xray's clients
|
||||
// still age out between traffic polls.
|
||||
j.inboundService.RefreshLocalOnlineClients(nil)
|
||||
|
||||
if !websocket.HasClients() {
|
||||
return
|
||||
@@ -121,6 +124,7 @@ func (j *NodeTrafficSyncJob) Run() {
|
||||
}
|
||||
websocket.BroadcastTraffic(map[string]any{
|
||||
"onlineClients": online,
|
||||
"onlineByNode": j.inboundService.GetOnlineClientsByNode(),
|
||||
"lastOnlineMap": lastOnline,
|
||||
})
|
||||
|
||||
|
||||
@@ -72,7 +72,17 @@ func (j *XrayTrafficJob) Run() {
|
||||
if lastOnlineMap == nil {
|
||||
lastOnlineMap = make(map[string]int64)
|
||||
}
|
||||
j.inboundService.RefreshOnlineClientsFromMap(lastOnlineMap)
|
||||
// Derive the local online set from this poll's per-email deltas rather
|
||||
// than the shared last_online column, which remote-node syncs also bump
|
||||
// and would otherwise make a client active only on a remote node appear
|
||||
// online on local inbounds.
|
||||
activeEmails := make([]string, 0, len(clientTraffics))
|
||||
for _, ct := range clientTraffics {
|
||||
if ct != nil && ct.Up+ct.Down > 0 {
|
||||
activeEmails = append(activeEmails, ct.Email)
|
||||
}
|
||||
}
|
||||
j.inboundService.RefreshLocalOnlineClients(activeEmails)
|
||||
|
||||
if !websocket.HasClients() {
|
||||
return
|
||||
@@ -86,6 +96,7 @@ func (j *XrayTrafficJob) Run() {
|
||||
"traffics": traffics,
|
||||
"clientTraffics": clientTraffics,
|
||||
"onlineClients": onlineClients,
|
||||
"onlineByNode": j.inboundService.GetOnlineClientsByNode(),
|
||||
"lastOnlineMap": lastOnlineMap,
|
||||
})
|
||||
|
||||
|
||||
@@ -3259,6 +3259,13 @@ func (s *InboundService) GetOnlineClients() []string {
|
||||
return p.GetOnlineClients()
|
||||
}
|
||||
|
||||
func (s *InboundService) GetOnlineClientsByNode() map[int][]string {
|
||||
if p == nil {
|
||||
return map[int][]string{}
|
||||
}
|
||||
return p.GetOnlineClientsByNode()
|
||||
}
|
||||
|
||||
func (s *InboundService) SetNodeOnlineClients(nodeID int, emails []string) {
|
||||
if p != nil {
|
||||
p.SetNodeOnlineClients(nodeID, emails)
|
||||
@@ -3285,16 +3292,13 @@ func (s *InboundService) GetClientsLastOnline() (map[string]int64, error) {
|
||||
return result, nil
|
||||
}
|
||||
|
||||
func (s *InboundService) RefreshOnlineClientsFromMap(lastOnlineMap map[string]int64) {
|
||||
now := time.Now().UnixMilli()
|
||||
newOnlineClients := make([]string, 0, len(lastOnlineMap))
|
||||
for email, lastOnline := range lastOnlineMap {
|
||||
if now-lastOnline < onlineGracePeriodMs {
|
||||
newOnlineClients = append(newOnlineClients, email)
|
||||
}
|
||||
}
|
||||
// RefreshLocalOnlineClients folds the emails active on this panel's own
|
||||
// xray this poll into the local online set, applying the grace window and
|
||||
// pruning stale entries. Pass nil to only prune. See xray.Process for why
|
||||
// the local set is kept separate from the shared last_online column.
|
||||
func (s *InboundService) RefreshLocalOnlineClients(activeEmails []string) {
|
||||
if p != nil {
|
||||
p.SetOnlineClients(newOnlineClients)
|
||||
p.RefreshLocalOnline(activeEmails, time.Now().UnixMilli(), onlineGracePeriodMs)
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -224,10 +224,7 @@ func (s *NodeService) GetAll() ([]*model.Node, error) {
|
||||
Select("inbound_id, email, enable, total, up, down, expiry_time").
|
||||
Where("inbound_id IN ?", inboundIDs).
|
||||
Scan(&trafficRows).Error; err == nil {
|
||||
online := make(map[string]struct{})
|
||||
for _, email := range s.onlineEmails() {
|
||||
online[email] = struct{}{}
|
||||
}
|
||||
onlineByNodeSet := s.onlineEmailsByNode()
|
||||
depletedByNode := make(map[int]int)
|
||||
onlineByNode := make(map[int]int)
|
||||
for _, row := range trafficRows {
|
||||
@@ -240,8 +237,12 @@ func (s *NodeService) GetAll() ([]*model.Node, error) {
|
||||
if expired || exhausted || !row.Enable {
|
||||
depletedByNode[nodeID]++
|
||||
}
|
||||
if _, ok := online[row.Email]; ok {
|
||||
onlineByNode[nodeID]++
|
||||
// Scope online by the node the inbound lives on: a client online
|
||||
// on one node must not count as online on another.
|
||||
if set, ok := onlineByNodeSet[nodeID]; ok {
|
||||
if _, isOnline := set[row.Email]; isOnline {
|
||||
onlineByNode[nodeID]++
|
||||
}
|
||||
}
|
||||
}
|
||||
for _, n := range nodes {
|
||||
@@ -254,9 +255,18 @@ func (s *NodeService) GetAll() ([]*model.Node, error) {
|
||||
return nodes, nil
|
||||
}
|
||||
|
||||
func (s *NodeService) onlineEmails() []string {
|
||||
func (s *NodeService) onlineEmailsByNode() map[int]map[string]struct{} {
|
||||
svc := InboundService{}
|
||||
return svc.GetOnlineClients()
|
||||
byNode := svc.GetOnlineClientsByNode()
|
||||
out := make(map[int]map[string]struct{}, len(byNode))
|
||||
for nodeID, emails := range byNode {
|
||||
set := make(map[string]struct{}, len(emails))
|
||||
for _, email := range emails {
|
||||
set[email] = struct{}{}
|
||||
}
|
||||
out[nodeID] = set
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
func (s *NodeService) GetById(id int) (*model.Node, error) {
|
||||
|
||||
110
xray/online_test.go
Normal file
110
xray/online_test.go
Normal file
@@ -0,0 +1,110 @@
|
||||
package xray
|
||||
|
||||
import (
|
||||
"slices"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func newOnlineTestProcess() *Process {
|
||||
return &Process{newProcess(nil)}
|
||||
}
|
||||
|
||||
func assertSameSet(t *testing.T, label string, got, want []string) {
|
||||
t.Helper()
|
||||
g := append([]string(nil), got...)
|
||||
w := append([]string(nil), want...)
|
||||
slices.Sort(g)
|
||||
slices.Sort(w)
|
||||
if !slices.Equal(g, w) {
|
||||
t.Errorf("%s = %v, want %v", label, got, want)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGetOnlineClientsByNodeScopesPerNode pins the fix for issue #4809: a
|
||||
// client online on one node must not be reported online on any other node.
|
||||
func TestGetOnlineClientsByNodeScopesPerNode(t *testing.T) {
|
||||
p := newOnlineTestProcess()
|
||||
p.RefreshLocalOnline([]string{"user1"}, 1000, 20000)
|
||||
p.SetNodeOnlineClients(3, []string{"user1", "user2"})
|
||||
p.SetNodeOnlineClients(5, []string{"user3"})
|
||||
|
||||
byNode := p.GetOnlineClientsByNode()
|
||||
|
||||
assertSameSet(t, "local (key 0)", byNode[localNodeKey], []string{"user1"})
|
||||
assertSameSet(t, "node 3", byNode[3], []string{"user1", "user2"})
|
||||
assertSameSet(t, "node 5", byNode[5], []string{"user3"})
|
||||
|
||||
if slices.Contains(byNode[5], "user1") {
|
||||
t.Errorf("user1 leaked onto node 5: %v", byNode[5])
|
||||
}
|
||||
if slices.Contains(byNode[localNodeKey], "user3") || slices.Contains(byNode[3], "user3") {
|
||||
t.Errorf("user3 leaked off node 5: local=%v node3=%v", byNode[localNodeKey], byNode[3])
|
||||
}
|
||||
}
|
||||
|
||||
// TestGetOnlineClientsByNodeOmitsEmptyGroups keeps the payload small: a node
|
||||
// with no online clients (e.g. just cleared) must not appear as an empty key.
|
||||
func TestGetOnlineClientsByNodeOmitsEmptyGroups(t *testing.T) {
|
||||
p := newOnlineTestProcess()
|
||||
p.SetNodeOnlineClients(3, []string{"user1"})
|
||||
p.SetNodeOnlineClients(7, []string{})
|
||||
|
||||
byNode := p.GetOnlineClientsByNode()
|
||||
|
||||
if _, ok := byNode[7]; ok {
|
||||
t.Errorf("node 7 has no online clients but is present: %v", byNode)
|
||||
}
|
||||
if _, ok := byNode[localNodeKey]; ok {
|
||||
t.Errorf("no local clients online but key 0 is present: %v", byNode)
|
||||
}
|
||||
}
|
||||
|
||||
// TestGetOnlineClientsUnionDedupes confirms the flat union (used by the
|
||||
// client-centric / total-count views) still merges every node and dedupes.
|
||||
func TestGetOnlineClientsUnionDedupes(t *testing.T) {
|
||||
p := newOnlineTestProcess()
|
||||
p.RefreshLocalOnline([]string{"user1"}, 1000, 20000)
|
||||
p.SetNodeOnlineClients(3, []string{"user1", "user2"})
|
||||
|
||||
assertSameSet(t, "union", p.GetOnlineClients(), []string{"user1", "user2"})
|
||||
}
|
||||
|
||||
// TestRefreshLocalOnlineGraceWindow checks the in-memory local set honours the
|
||||
// grace window: idle-but-recent clients stay online, stale ones age out, and
|
||||
// the set is derived only from local activity (never the shared DB column).
|
||||
func TestRefreshLocalOnlineGraceWindow(t *testing.T) {
|
||||
p := newOnlineTestProcess()
|
||||
const grace = 20000
|
||||
|
||||
p.RefreshLocalOnline([]string{"user1"}, 1000, grace)
|
||||
if got := p.GetOnlineClientsByNode()[localNodeKey]; !slices.Contains(got, "user1") {
|
||||
t.Fatalf("user1 should be online right after activity, got %v", got)
|
||||
}
|
||||
|
||||
p.RefreshLocalOnline([]string{"user2"}, 11000, grace)
|
||||
got := p.GetOnlineClientsByNode()[localNodeKey]
|
||||
if !slices.Contains(got, "user1") || !slices.Contains(got, "user2") {
|
||||
t.Fatalf("both within grace window, got %v", got)
|
||||
}
|
||||
|
||||
p.RefreshLocalOnline(nil, 22000, grace)
|
||||
got = p.GetOnlineClientsByNode()[localNodeKey]
|
||||
if slices.Contains(got, "user1") {
|
||||
t.Errorf("user1 (idle 21s, past grace) should have aged out, got %v", got)
|
||||
}
|
||||
if !slices.Contains(got, "user2") {
|
||||
t.Errorf("user2 (idle 11s, within grace) should still be online, got %v", got)
|
||||
}
|
||||
}
|
||||
|
||||
// TestClearNodeOnlineClientsDropsNode mirrors a failed node probe: the node's
|
||||
// clients must disappear from the per-node map immediately.
|
||||
func TestClearNodeOnlineClientsDropsNode(t *testing.T) {
|
||||
p := newOnlineTestProcess()
|
||||
p.SetNodeOnlineClients(3, []string{"user1"})
|
||||
p.ClearNodeOnlineClients(3)
|
||||
|
||||
if _, ok := p.GetOnlineClientsByNode()[3]; ok {
|
||||
t.Errorf("node 3 should be absent after ClearNodeOnlineClients")
|
||||
}
|
||||
}
|
||||
@@ -129,12 +129,24 @@ type process struct {
|
||||
version string
|
||||
apiPort int
|
||||
|
||||
// onlineClients is the set of emails active on THIS panel's own xray
|
||||
// within the online grace window. It is derived only from local xray
|
||||
// traffic polls (see RefreshLocalOnline) — never from remote-node
|
||||
// snapshots — so a client connected solely to a remote node is not
|
||||
// reported online on local inbounds.
|
||||
onlineClients []string
|
||||
// localLastOnline records, per email, the last time this panel's own
|
||||
// xray reported traffic for it. RefreshLocalOnline rebuilds
|
||||
// onlineClients from this map each tick, keeping the local online set
|
||||
// independent of the shared client_traffics.last_online column — that
|
||||
// column is bumped by remote-node syncs too and would otherwise leak
|
||||
// remote-only clients into the local set.
|
||||
localLastOnline map[string]int64
|
||||
// nodeOnlineClients holds the online-emails list reported by each
|
||||
// remote node, keyed by node id. NodeTrafficSyncJob populates entries
|
||||
// per cron tick and clears them when a node's probe fails. The mutex
|
||||
// guards both this map and onlineClients above so GetOnlineClients
|
||||
// can build the union without a torn read.
|
||||
// guards this map, onlineClients, and localLastOnline above so the
|
||||
// online getters never see a torn read.
|
||||
nodeOnlineClients map[int][]string
|
||||
onlineMu sync.RWMutex
|
||||
|
||||
@@ -152,6 +164,12 @@ var (
|
||||
xrayForceStopTimeout = 2 * time.Second
|
||||
)
|
||||
|
||||
// localNodeKey is the GetOnlineClientsByNode key under which this panel's
|
||||
// own (non-node-managed) inbounds report their online clients. Node ids
|
||||
// autoincrement from 1, so 0 is a safe sentinel that never collides with a
|
||||
// real node. The frontend mirrors this contract (nodeId ?? 0).
|
||||
const localNodeKey = 0
|
||||
|
||||
// newProcess creates a new internal process struct for Xray.
|
||||
func newProcess(config *Config) *process {
|
||||
return &process{
|
||||
@@ -251,12 +269,57 @@ func (p *Process) GetOnlineClients() []string {
|
||||
return out
|
||||
}
|
||||
|
||||
// SetOnlineClients sets the locally-online list. Called by the local
|
||||
// XrayTrafficJob after each xray gRPC stats poll.
|
||||
func (p *Process) SetOnlineClients(users []string) {
|
||||
// GetOnlineClientsByNode returns online emails grouped by the node that
|
||||
// reported them: this panel's own xray clients under localNodeKey (0), and
|
||||
// each remote node's clients under that node's id. Unlike GetOnlineClients
|
||||
// (which flattens everything into one deduped union), this preserves node
|
||||
// attribution so per-inbound/per-node online counts don't bleed a client
|
||||
// connected to one node onto every other node. Empty groups are omitted.
|
||||
func (p *Process) GetOnlineClientsByNode() map[int][]string {
|
||||
p.onlineMu.RLock()
|
||||
defer p.onlineMu.RUnlock()
|
||||
|
||||
out := make(map[int][]string, len(p.nodeOnlineClients)+1)
|
||||
if len(p.onlineClients) > 0 {
|
||||
local := make([]string, len(p.onlineClients))
|
||||
copy(local, p.onlineClients)
|
||||
out[localNodeKey] = local
|
||||
}
|
||||
for nodeID, list := range p.nodeOnlineClients {
|
||||
if len(list) == 0 {
|
||||
continue
|
||||
}
|
||||
cp := make([]string, len(list))
|
||||
copy(cp, list)
|
||||
out[nodeID] = cp
|
||||
}
|
||||
return out
|
||||
}
|
||||
|
||||
// RefreshLocalOnline records that each email in activeEmails had local xray
|
||||
// traffic at now, then rebuilds onlineClients from every email seen within
|
||||
// graceMs and prunes entries older than that. Called by the local
|
||||
// XrayTrafficJob after each xray gRPC stats poll. Pass a nil/empty
|
||||
// activeEmails to only prune — NodeTrafficSyncJob does this so a stopped
|
||||
// local xray's clients still age out between local traffic polls.
|
||||
func (p *Process) RefreshLocalOnline(activeEmails []string, now, graceMs int64) {
|
||||
p.onlineMu.Lock()
|
||||
p.onlineClients = users
|
||||
p.onlineMu.Unlock()
|
||||
defer p.onlineMu.Unlock()
|
||||
if p.localLastOnline == nil {
|
||||
p.localLastOnline = make(map[string]int64, len(activeEmails))
|
||||
}
|
||||
for _, email := range activeEmails {
|
||||
p.localLastOnline[email] = now
|
||||
}
|
||||
online := make([]string, 0, len(p.localLastOnline))
|
||||
for email, ts := range p.localLastOnline {
|
||||
if now-ts < graceMs {
|
||||
online = append(online, email)
|
||||
} else {
|
||||
delete(p.localLastOnline, email)
|
||||
}
|
||||
}
|
||||
p.onlineClients = online
|
||||
}
|
||||
|
||||
// SetNodeOnlineClients records the online-emails set for one remote
|
||||
|
||||
Reference in New Issue
Block a user