From 3af2da01425d722cfbe7fed97e024250256e3ebc Mon Sep 17 00:00:00 2001 From: MHSanaei Date: Tue, 2 Jun 2026 18:33:21 +0200 Subject: [PATCH] fix(online): scope online status per node instead of a global union The inbounds page and Nodes page checked each client's email against a single deduped union of every node's online clients, so a client connected to one node showed as online on every inbound across every node. The local online set was also derived from the email-keyed client_traffics.last_online column, which remote-node syncs bump too, leaking remote-only clients onto local inbounds. Track online clients per node: the local panel's own xray clients under key 0 (derived from live traffic-poll deltas via RefreshLocalOnline, kept in memory and independent of the shared last_online column) and each remote node under its id. Add GetOnlineClientsByNode plus a /clients/onlinesByNode endpoint and onlineByNode WS field; node.go and the inbounds rollup now scope online by node. The flat GetOnlineClients union is kept for client-centric and total-count views (Clients page, dashboard, telegram). Closes #4809 --- frontend/public/openapi.json | 44 ++++++++- frontend/src/api/queryKeys.ts | 1 + frontend/src/pages/api-docs/endpoints.ts | 8 +- frontend/src/pages/inbounds/useInbounds.ts | 48 ++++++++- frontend/src/schemas/client.ts | 5 + web/controller/client.go | 5 + web/job/node_traffic_sync_job.go | 6 +- web/job/xray_traffic_job.go | 13 ++- web/service/inbound.go | 22 +++-- web/service/node.go | 26 +++-- xray/online_test.go | 110 +++++++++++++++++++++ xray/process.go | 77 +++++++++++++-- 12 files changed, 334 insertions(+), 31 deletions(-) create mode 100644 xray/online_test.go diff --git a/frontend/public/openapi.json b/frontend/public/openapi.json index 72befad5..05ea9182 100644 --- a/frontend/public/openapi.json +++ b/frontend/public/openapi.json @@ -3617,7 +3617,7 @@ "tags": [ "Clients" ], - "summary": "List the emails of currently connected clients (last seen within the heartbeat window).", + "summary": "List the emails of currently connected clients (last seen within the heartbeat window), deduped across every node.", "operationId": "post_panel_api_clients_onlines", "responses": { "200": { @@ -3649,6 +3649,48 @@ } } }, + "/panel/api/clients/onlinesByNode": { + "post": { + "tags": [ + "Clients" + ], + "summary": "Online client emails grouped by the node that reported them. The local panel uses key \"0\"; each remote node uses its node id. Lets the inbounds page show online status per node instead of merging every node together.", + "operationId": "post_panel_api_clients_onlinesByNode", + "responses": { + "200": { + "description": "Successful response", + "content": { + "application/json": { + "schema": { + "type": "object", + "properties": { + "success": { + "type": "boolean" + }, + "msg": { + "type": "string" + }, + "obj": {} + } + }, + "example": { + "success": true, + "obj": { + "0": [ + "user1" + ], + "3": [ + "user1", + "user2" + ] + } + } + } + } + } + } + } + }, "/panel/api/clients/lastOnline": { "post": { "tags": [ diff --git a/frontend/src/api/queryKeys.ts b/frontend/src/api/queryKeys.ts index 4f7f5037..e8288016 100644 --- a/frontend/src/api/queryKeys.ts +++ b/frontend/src/api/queryKeys.ts @@ -21,6 +21,7 @@ export const keys = { list: (params: unknown) => ['clients', 'list', params] as const, all: () => ['clients', 'all'] as const, onlines: () => ['clients', 'onlines'] as const, + onlinesByNode: () => ['clients', 'onlinesByNode'] as const, lastOnline: () => ['clients', 'lastOnline'] as const, groups: () => ['clients', 'groups'] as const, }, diff --git a/frontend/src/pages/api-docs/endpoints.ts b/frontend/src/pages/api-docs/endpoints.ts index 7183f9b0..240f816f 100644 --- a/frontend/src/pages/api-docs/endpoints.ts +++ b/frontend/src/pages/api-docs/endpoints.ts @@ -666,9 +666,15 @@ export const sections: readonly Section[] = [ { method: 'POST', path: '/panel/api/clients/onlines', - summary: 'List the emails of currently connected clients (last seen within the heartbeat window).', + summary: 'List the emails of currently connected clients (last seen within the heartbeat window), deduped across every node.', response: '{\n "success": true,\n "obj": ["user1", "user2"]\n}', }, + { + method: 'POST', + path: '/panel/api/clients/onlinesByNode', + summary: 'Online client emails grouped by the node that reported them. The local panel uses key "0"; each remote node uses its node id. Lets the inbounds page show online status per node instead of merging every node together.', + response: '{\n "success": true,\n "obj": {\n "0": ["user1"],\n "3": ["user1", "user2"]\n }\n}', + }, { method: 'POST', path: '/panel/api/clients/lastOnline', diff --git a/frontend/src/pages/inbounds/useInbounds.ts b/frontend/src/pages/inbounds/useInbounds.ts index ce1aadf4..5d13e854 100644 --- a/frontend/src/pages/inbounds/useInbounds.ts +++ b/frontend/src/pages/inbounds/useInbounds.ts @@ -9,7 +9,7 @@ import { isSSMultiUser } from '@/lib/xray/protocol-capabilities'; import { setDatepicker } from '@/hooks/useDatepicker'; import { keys } from '@/api/queryKeys'; import { SlimInboundListSchema, LastOnlineMapSchema, InboundDetailSchema } from '@/schemas/inbound'; -import { OnlinesSchema } from '@/schemas/client'; +import { OnlinesSchema, OnlineByNodeSchema } from '@/schemas/client'; import { DefaultsPayloadSchema, type DefaultsPayload } from '@/schemas/defaults'; export interface SubSettings { @@ -54,6 +54,25 @@ async function fetchOnlineClients(): Promise { return Array.isArray(validated.obj) ? validated.obj : []; } +// Online emails grouped by node id (local panel = key 0), used to scope the +// per-inbound online rollup so a client online on one node is not shown +// online on every node's inbounds. +async function fetchOnlineClientsByNode(): Promise> { + const msg = await HttpUtil.post('/panel/api/clients/onlinesByNode', undefined, { silent: true }); + if (!msg?.success) throw new Error(msg?.msg || 'Failed to fetch onlinesByNode'); + const validated = parseMsg(msg, OnlineByNodeSchema, 'clients/onlinesByNode'); + return (validated.obj && typeof validated.obj === 'object') ? (validated.obj as Record) : {}; +} + +function toNodeOnlineMap(data: Record): Map> { + const map = new Map>(); + for (const [key, emails] of Object.entries(data)) { + if (!Array.isArray(emails)) continue; + map.set(Number(key), new Set(emails)); + } + return map; +} + async function fetchLastOnlineMap(): Promise> { const msg = await HttpUtil.post('/panel/api/clients/lastOnline', undefined, { silent: true }); if (!msg?.success) throw new Error(msg?.msg || 'Failed to fetch lastOnline'); @@ -83,6 +102,12 @@ export function useInbounds() { staleTime: Infinity, }); + const onlinesByNodeQuery = useQuery({ + queryKey: keys.clients.onlinesByNode(), + queryFn: fetchOnlineClientsByNode, + staleTime: Infinity, + }); + const lastOnlineQuery = useQuery({ queryKey: keys.clients.lastOnline(), queryFn: fetchLastOnlineMap, @@ -135,6 +160,10 @@ export function useInbounds() { const onlineClientsRef = useRef([]); onlineClientsRef.current = onlineClients; + // Online emails keyed by node id (local inbounds = key 0). The rollup + // reads this so each inbound only counts clients online on its own node. + const onlineByNodeRef = useRef>>(new Map()); + const [lastOnlineMap, setLastOnlineMap] = useState>({}); const rollupClients = useCallback( @@ -151,12 +180,14 @@ export function useInbounds() { const comments = new Map(); const now = Date.now(); + const nodeOnline = onlineByNodeRef.current.get(dbInbound.nodeId ?? 0); + if (dbInbound.enable) { for (const client of clients) { if (client.comment && client.email) comments.set(client.email, client.comment); if (client.enable) { if (client.email) active.push(client.email); - if (client.email && onlineClientsRef.current.includes(client.email)) online.push(client.email); + if (client.email && nodeOnline?.has(client.email)) online.push(client.email); } else if (client.email) { deactive.push(client.email); } @@ -237,6 +268,13 @@ export function useInbounds() { } }, [onlinesQuery.data]); + useEffect(() => { + if (onlinesByNodeQuery.data) { + onlineByNodeRef.current = toNodeOnlineMap(onlinesByNodeQuery.data); + rebuildClientCount(); + } + }, [onlinesByNodeQuery.data, rebuildClientCount]); + useEffect(() => { if (lastOnlineQuery.data) setLastOnlineMap(lastOnlineQuery.data); }, [lastOnlineQuery.data]); @@ -255,6 +293,7 @@ export function useInbounds() { await Promise.all([ queryClient.invalidateQueries({ queryKey: keys.inbounds.root() }), queryClient.invalidateQueries({ queryKey: keys.clients.onlines() }), + queryClient.invalidateQueries({ queryKey: keys.clients.onlinesByNode() }), queryClient.invalidateQueries({ queryKey: keys.clients.lastOnline() }), queryClient.invalidateQueries({ queryKey: keys.xray.config() }), ]); @@ -284,11 +323,14 @@ export function useInbounds() { const applyTrafficEvent = useCallback( (payload: unknown) => { if (!payload || typeof payload !== 'object') return; - const p = payload as { onlineClients?: string[]; lastOnlineMap?: Record }; + const p = payload as { onlineClients?: string[]; onlineByNode?: Record; lastOnlineMap?: Record }; if (Array.isArray(p.onlineClients)) { onlineClientsRef.current = p.onlineClients; setOnlineClients(p.onlineClients); } + if (p.onlineByNode && typeof p.onlineByNode === 'object') { + onlineByNodeRef.current = toNodeOnlineMap(p.onlineByNode); + } if (p.lastOnlineMap && typeof p.lastOnlineMap === 'object') { setLastOnlineMap((prev) => ({ ...prev, ...p.lastOnlineMap! })); } diff --git a/frontend/src/schemas/client.ts b/frontend/src/schemas/client.ts index 52624b9c..3f3085f5 100644 --- a/frontend/src/schemas/client.ts +++ b/frontend/src/schemas/client.ts @@ -112,6 +112,11 @@ export const BulkDetachResultSchema = z.object({ export const OnlinesSchema = nullableStringArray; +export const OnlineByNodeSchema = z + .record(z.string(), nullableStringArray) + .nullable() + .transform((v) => v ?? {}); + export const GroupSummarySchema = z.object({ name: z.string(), clientCount: z.number(), diff --git a/web/controller/client.go b/web/controller/client.go index e57d26d9..9f2c1886 100644 --- a/web/controller/client.go +++ b/web/controller/client.go @@ -55,6 +55,7 @@ func (a *ClientController) initRouter(g *gin.RouterGroup) { g.POST("/ips/:email", a.getIps) g.POST("/clearIps/:email", a.clearIps) g.POST("/onlines", a.onlines) + g.POST("/onlinesByNode", a.onlinesByNode) g.POST("/lastOnline", a.lastOnline) } @@ -397,6 +398,10 @@ func (a *ClientController) onlines(c *gin.Context) { jsonObj(c, a.inboundService.GetOnlineClients(), nil) } +func (a *ClientController) onlinesByNode(c *gin.Context) { + jsonObj(c, a.inboundService.GetOnlineClientsByNode(), nil) +} + func (a *ClientController) lastOnline(c *gin.Context) { data, err := a.inboundService.GetClientsLastOnline() jsonObj(c, data, err) diff --git a/web/job/node_traffic_sync_job.go b/web/job/node_traffic_sync_job.go index 1cf32323..39fb752b 100644 --- a/web/job/node_traffic_sync_job.go +++ b/web/job/node_traffic_sync_job.go @@ -109,7 +109,10 @@ func (j *NodeTrafficSyncJob) Run() { lastOnline = map[string]int64{} } - j.inboundService.RefreshOnlineClientsFromMap(lastOnline) + // Prune stale local-online entries (no local active emails to add here — + // only the local xray poll feeds those) so a stopped local xray's clients + // still age out between traffic polls. + j.inboundService.RefreshLocalOnlineClients(nil) if !websocket.HasClients() { return @@ -121,6 +124,7 @@ func (j *NodeTrafficSyncJob) Run() { } websocket.BroadcastTraffic(map[string]any{ "onlineClients": online, + "onlineByNode": j.inboundService.GetOnlineClientsByNode(), "lastOnlineMap": lastOnline, }) diff --git a/web/job/xray_traffic_job.go b/web/job/xray_traffic_job.go index 583c5995..e88069d7 100644 --- a/web/job/xray_traffic_job.go +++ b/web/job/xray_traffic_job.go @@ -72,7 +72,17 @@ func (j *XrayTrafficJob) Run() { if lastOnlineMap == nil { lastOnlineMap = make(map[string]int64) } - j.inboundService.RefreshOnlineClientsFromMap(lastOnlineMap) + // Derive the local online set from this poll's per-email deltas rather + // than the shared last_online column, which remote-node syncs also bump + // and would otherwise make a client active only on a remote node appear + // online on local inbounds. + activeEmails := make([]string, 0, len(clientTraffics)) + for _, ct := range clientTraffics { + if ct != nil && ct.Up+ct.Down > 0 { + activeEmails = append(activeEmails, ct.Email) + } + } + j.inboundService.RefreshLocalOnlineClients(activeEmails) if !websocket.HasClients() { return @@ -86,6 +96,7 @@ func (j *XrayTrafficJob) Run() { "traffics": traffics, "clientTraffics": clientTraffics, "onlineClients": onlineClients, + "onlineByNode": j.inboundService.GetOnlineClientsByNode(), "lastOnlineMap": lastOnlineMap, }) diff --git a/web/service/inbound.go b/web/service/inbound.go index f8a0e918..0e3263ba 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -3259,6 +3259,13 @@ func (s *InboundService) GetOnlineClients() []string { return p.GetOnlineClients() } +func (s *InboundService) GetOnlineClientsByNode() map[int][]string { + if p == nil { + return map[int][]string{} + } + return p.GetOnlineClientsByNode() +} + func (s *InboundService) SetNodeOnlineClients(nodeID int, emails []string) { if p != nil { p.SetNodeOnlineClients(nodeID, emails) @@ -3285,16 +3292,13 @@ func (s *InboundService) GetClientsLastOnline() (map[string]int64, error) { return result, nil } -func (s *InboundService) RefreshOnlineClientsFromMap(lastOnlineMap map[string]int64) { - now := time.Now().UnixMilli() - newOnlineClients := make([]string, 0, len(lastOnlineMap)) - for email, lastOnline := range lastOnlineMap { - if now-lastOnline < onlineGracePeriodMs { - newOnlineClients = append(newOnlineClients, email) - } - } +// RefreshLocalOnlineClients folds the emails active on this panel's own +// xray this poll into the local online set, applying the grace window and +// pruning stale entries. Pass nil to only prune. See xray.Process for why +// the local set is kept separate from the shared last_online column. +func (s *InboundService) RefreshLocalOnlineClients(activeEmails []string) { if p != nil { - p.SetOnlineClients(newOnlineClients) + p.RefreshLocalOnline(activeEmails, time.Now().UnixMilli(), onlineGracePeriodMs) } } diff --git a/web/service/node.go b/web/service/node.go index d8f51e52..705b10e7 100644 --- a/web/service/node.go +++ b/web/service/node.go @@ -224,10 +224,7 @@ func (s *NodeService) GetAll() ([]*model.Node, error) { Select("inbound_id, email, enable, total, up, down, expiry_time"). Where("inbound_id IN ?", inboundIDs). Scan(&trafficRows).Error; err == nil { - online := make(map[string]struct{}) - for _, email := range s.onlineEmails() { - online[email] = struct{}{} - } + onlineByNodeSet := s.onlineEmailsByNode() depletedByNode := make(map[int]int) onlineByNode := make(map[int]int) for _, row := range trafficRows { @@ -240,8 +237,12 @@ func (s *NodeService) GetAll() ([]*model.Node, error) { if expired || exhausted || !row.Enable { depletedByNode[nodeID]++ } - if _, ok := online[row.Email]; ok { - onlineByNode[nodeID]++ + // Scope online by the node the inbound lives on: a client online + // on one node must not count as online on another. + if set, ok := onlineByNodeSet[nodeID]; ok { + if _, isOnline := set[row.Email]; isOnline { + onlineByNode[nodeID]++ + } } } for _, n := range nodes { @@ -254,9 +255,18 @@ func (s *NodeService) GetAll() ([]*model.Node, error) { return nodes, nil } -func (s *NodeService) onlineEmails() []string { +func (s *NodeService) onlineEmailsByNode() map[int]map[string]struct{} { svc := InboundService{} - return svc.GetOnlineClients() + byNode := svc.GetOnlineClientsByNode() + out := make(map[int]map[string]struct{}, len(byNode)) + for nodeID, emails := range byNode { + set := make(map[string]struct{}, len(emails)) + for _, email := range emails { + set[email] = struct{}{} + } + out[nodeID] = set + } + return out } func (s *NodeService) GetById(id int) (*model.Node, error) { diff --git a/xray/online_test.go b/xray/online_test.go new file mode 100644 index 00000000..c5d10288 --- /dev/null +++ b/xray/online_test.go @@ -0,0 +1,110 @@ +package xray + +import ( + "slices" + "testing" +) + +func newOnlineTestProcess() *Process { + return &Process{newProcess(nil)} +} + +func assertSameSet(t *testing.T, label string, got, want []string) { + t.Helper() + g := append([]string(nil), got...) + w := append([]string(nil), want...) + slices.Sort(g) + slices.Sort(w) + if !slices.Equal(g, w) { + t.Errorf("%s = %v, want %v", label, got, want) + } +} + +// TestGetOnlineClientsByNodeScopesPerNode pins the fix for issue #4809: a +// client online on one node must not be reported online on any other node. +func TestGetOnlineClientsByNodeScopesPerNode(t *testing.T) { + p := newOnlineTestProcess() + p.RefreshLocalOnline([]string{"user1"}, 1000, 20000) + p.SetNodeOnlineClients(3, []string{"user1", "user2"}) + p.SetNodeOnlineClients(5, []string{"user3"}) + + byNode := p.GetOnlineClientsByNode() + + assertSameSet(t, "local (key 0)", byNode[localNodeKey], []string{"user1"}) + assertSameSet(t, "node 3", byNode[3], []string{"user1", "user2"}) + assertSameSet(t, "node 5", byNode[5], []string{"user3"}) + + if slices.Contains(byNode[5], "user1") { + t.Errorf("user1 leaked onto node 5: %v", byNode[5]) + } + if slices.Contains(byNode[localNodeKey], "user3") || slices.Contains(byNode[3], "user3") { + t.Errorf("user3 leaked off node 5: local=%v node3=%v", byNode[localNodeKey], byNode[3]) + } +} + +// TestGetOnlineClientsByNodeOmitsEmptyGroups keeps the payload small: a node +// with no online clients (e.g. just cleared) must not appear as an empty key. +func TestGetOnlineClientsByNodeOmitsEmptyGroups(t *testing.T) { + p := newOnlineTestProcess() + p.SetNodeOnlineClients(3, []string{"user1"}) + p.SetNodeOnlineClients(7, []string{}) + + byNode := p.GetOnlineClientsByNode() + + if _, ok := byNode[7]; ok { + t.Errorf("node 7 has no online clients but is present: %v", byNode) + } + if _, ok := byNode[localNodeKey]; ok { + t.Errorf("no local clients online but key 0 is present: %v", byNode) + } +} + +// TestGetOnlineClientsUnionDedupes confirms the flat union (used by the +// client-centric / total-count views) still merges every node and dedupes. +func TestGetOnlineClientsUnionDedupes(t *testing.T) { + p := newOnlineTestProcess() + p.RefreshLocalOnline([]string{"user1"}, 1000, 20000) + p.SetNodeOnlineClients(3, []string{"user1", "user2"}) + + assertSameSet(t, "union", p.GetOnlineClients(), []string{"user1", "user2"}) +} + +// TestRefreshLocalOnlineGraceWindow checks the in-memory local set honours the +// grace window: idle-but-recent clients stay online, stale ones age out, and +// the set is derived only from local activity (never the shared DB column). +func TestRefreshLocalOnlineGraceWindow(t *testing.T) { + p := newOnlineTestProcess() + const grace = 20000 + + p.RefreshLocalOnline([]string{"user1"}, 1000, grace) + if got := p.GetOnlineClientsByNode()[localNodeKey]; !slices.Contains(got, "user1") { + t.Fatalf("user1 should be online right after activity, got %v", got) + } + + p.RefreshLocalOnline([]string{"user2"}, 11000, grace) + got := p.GetOnlineClientsByNode()[localNodeKey] + if !slices.Contains(got, "user1") || !slices.Contains(got, "user2") { + t.Fatalf("both within grace window, got %v", got) + } + + p.RefreshLocalOnline(nil, 22000, grace) + got = p.GetOnlineClientsByNode()[localNodeKey] + if slices.Contains(got, "user1") { + t.Errorf("user1 (idle 21s, past grace) should have aged out, got %v", got) + } + if !slices.Contains(got, "user2") { + t.Errorf("user2 (idle 11s, within grace) should still be online, got %v", got) + } +} + +// TestClearNodeOnlineClientsDropsNode mirrors a failed node probe: the node's +// clients must disappear from the per-node map immediately. +func TestClearNodeOnlineClientsDropsNode(t *testing.T) { + p := newOnlineTestProcess() + p.SetNodeOnlineClients(3, []string{"user1"}) + p.ClearNodeOnlineClients(3) + + if _, ok := p.GetOnlineClientsByNode()[3]; ok { + t.Errorf("node 3 should be absent after ClearNodeOnlineClients") + } +} diff --git a/xray/process.go b/xray/process.go index 89bd5fe5..cfe5d017 100644 --- a/xray/process.go +++ b/xray/process.go @@ -129,12 +129,24 @@ type process struct { version string apiPort int + // onlineClients is the set of emails active on THIS panel's own xray + // within the online grace window. It is derived only from local xray + // traffic polls (see RefreshLocalOnline) — never from remote-node + // snapshots — so a client connected solely to a remote node is not + // reported online on local inbounds. onlineClients []string + // localLastOnline records, per email, the last time this panel's own + // xray reported traffic for it. RefreshLocalOnline rebuilds + // onlineClients from this map each tick, keeping the local online set + // independent of the shared client_traffics.last_online column — that + // column is bumped by remote-node syncs too and would otherwise leak + // remote-only clients into the local set. + localLastOnline map[string]int64 // nodeOnlineClients holds the online-emails list reported by each // remote node, keyed by node id. NodeTrafficSyncJob populates entries // per cron tick and clears them when a node's probe fails. The mutex - // guards both this map and onlineClients above so GetOnlineClients - // can build the union without a torn read. + // guards this map, onlineClients, and localLastOnline above so the + // online getters never see a torn read. nodeOnlineClients map[int][]string onlineMu sync.RWMutex @@ -152,6 +164,12 @@ var ( xrayForceStopTimeout = 2 * time.Second ) +// localNodeKey is the GetOnlineClientsByNode key under which this panel's +// own (non-node-managed) inbounds report their online clients. Node ids +// autoincrement from 1, so 0 is a safe sentinel that never collides with a +// real node. The frontend mirrors this contract (nodeId ?? 0). +const localNodeKey = 0 + // newProcess creates a new internal process struct for Xray. func newProcess(config *Config) *process { return &process{ @@ -251,12 +269,57 @@ func (p *Process) GetOnlineClients() []string { return out } -// SetOnlineClients sets the locally-online list. Called by the local -// XrayTrafficJob after each xray gRPC stats poll. -func (p *Process) SetOnlineClients(users []string) { +// GetOnlineClientsByNode returns online emails grouped by the node that +// reported them: this panel's own xray clients under localNodeKey (0), and +// each remote node's clients under that node's id. Unlike GetOnlineClients +// (which flattens everything into one deduped union), this preserves node +// attribution so per-inbound/per-node online counts don't bleed a client +// connected to one node onto every other node. Empty groups are omitted. +func (p *Process) GetOnlineClientsByNode() map[int][]string { + p.onlineMu.RLock() + defer p.onlineMu.RUnlock() + + out := make(map[int][]string, len(p.nodeOnlineClients)+1) + if len(p.onlineClients) > 0 { + local := make([]string, len(p.onlineClients)) + copy(local, p.onlineClients) + out[localNodeKey] = local + } + for nodeID, list := range p.nodeOnlineClients { + if len(list) == 0 { + continue + } + cp := make([]string, len(list)) + copy(cp, list) + out[nodeID] = cp + } + return out +} + +// RefreshLocalOnline records that each email in activeEmails had local xray +// traffic at now, then rebuilds onlineClients from every email seen within +// graceMs and prunes entries older than that. Called by the local +// XrayTrafficJob after each xray gRPC stats poll. Pass a nil/empty +// activeEmails to only prune — NodeTrafficSyncJob does this so a stopped +// local xray's clients still age out between local traffic polls. +func (p *Process) RefreshLocalOnline(activeEmails []string, now, graceMs int64) { p.onlineMu.Lock() - p.onlineClients = users - p.onlineMu.Unlock() + defer p.onlineMu.Unlock() + if p.localLastOnline == nil { + p.localLastOnline = make(map[string]int64, len(activeEmails)) + } + for _, email := range activeEmails { + p.localLastOnline[email] = now + } + online := make([]string, 0, len(p.localLastOnline)) + for email, ts := range p.localLastOnline { + if now-ts < graceMs { + online = append(online, email) + } else { + delete(p.localLastOnline, email) + } + } + p.onlineClients = online } // SetNodeOnlineClients records the online-emails set for one remote