fix(online): scope per-inbound online to inbounds that carried traffic

Multi-inbound clients showed online on every inbound they were attached to. Xray's user-level traffic stat aggregates across all inbounds a client belongs to, so the email signal alone can't say which inbound was used.

Pair it with the inbound-level traffic signal under the same 20s grace and gate the per-inbound rollup on it: a client only shows online on inbounds that actually moved bytes this window. Remote nodes report no per-inbound activity and stay ungated (no regression). Adds GetActiveInboundsByNode, the activeInbounds WS field and POST /panel/api/clients/activeInbounds.

Fixes #4859
This commit is contained in:
MHSanaei
2026-06-03 16:19:00 +02:00
parent 5fb18b8819
commit ef8882a5c0
11 changed files with 224 additions and 30 deletions

View File

@@ -3691,6 +3691,45 @@
}
}
},
"/panel/api/clients/activeInbounds": {
"post": {
"tags": [
"Clients"
],
"summary": "Inbound tags that carried traffic within the heartbeat window, grouped by node (local panel uses key \"0\"). Pairs with onlinesByNode so the inbounds page only marks a multi-inbound client online on the inbounds it actually used. Nodes that do not report per-inbound activity are absent.",
"operationId": "post_panel_api_clients_activeInbounds",
"responses": {
"200": {
"description": "Successful response",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"success": {
"type": "boolean"
},
"msg": {
"type": "string"
},
"obj": {}
}
},
"example": {
"success": true,
"obj": {
"0": [
"inbound-443",
"inbound-8443"
]
}
}
}
}
}
}
}
},
"/panel/api/clients/lastOnline": {
"post": {
"tags": [

View File

@@ -22,6 +22,7 @@ export const keys = {
all: () => ['clients', 'all'] as const,
onlines: () => ['clients', 'onlines'] as const,
onlinesByNode: () => ['clients', 'onlinesByNode'] as const,
activeInbounds: () => ['clients', 'activeInbounds'] as const,
lastOnline: () => ['clients', 'lastOnline'] as const,
groups: () => ['clients', 'groups'] as const,
},

View File

@@ -675,6 +675,12 @@ export const sections: readonly Section[] = [
summary: 'Online client emails grouped by the node that reported them. The local panel uses key "0"; each remote node uses its node id. Lets the inbounds page show online status per node instead of merging every node together.',
response: '{\n "success": true,\n "obj": {\n "0": ["user1"],\n "3": ["user1", "user2"]\n }\n}',
},
{
method: 'POST',
path: '/panel/api/clients/activeInbounds',
summary: 'Inbound tags that carried traffic within the heartbeat window, grouped by node (local panel uses key "0"). Pairs with onlinesByNode so the inbounds page only marks a multi-inbound client online on the inbounds it actually used. Nodes that do not report per-inbound activity are absent.',
response: '{\n "success": true,\n "obj": {\n "0": ["inbound-443", "inbound-8443"]\n }\n}',
},
{
method: 'POST',
path: '/panel/api/clients/lastOnline',

View File

@@ -9,7 +9,7 @@ import { isSSMultiUser } from '@/lib/xray/protocol-capabilities';
import { setDatepicker } from '@/hooks/useDatepicker';
import { keys } from '@/api/queryKeys';
import { SlimInboundListSchema, LastOnlineMapSchema, InboundDetailSchema } from '@/schemas/inbound';
import { OnlinesSchema, OnlineByNodeSchema } from '@/schemas/client';
import { OnlinesSchema, OnlineByNodeSchema, ActiveInboundsByNodeSchema } from '@/schemas/client';
import { DefaultsPayloadSchema, type DefaultsPayload } from '@/schemas/defaults';
export interface SubSettings {
@@ -68,6 +68,17 @@ async function fetchOnlineClientsByNode(): Promise<Record<string, string[]>> {
return (validated.obj && typeof validated.obj === 'object') ? (validated.obj as Record<string, string[]>) : {};
}
// Inbound tags that carried traffic recently, grouped by node (local = key 0).
// Pairs with the per-node online map so a client attached to several inbounds
// is only marked online on the ones that actually moved bytes — Xray's
// user-level stat can't attribute traffic to a single inbound on its own.
async function fetchActiveInboundsByNode(): Promise<Record<string, string[]>> {
const msg = await HttpUtil.post('/panel/api/clients/activeInbounds', undefined, { silent: true });
if (!msg?.success) throw new Error(msg?.msg || 'Failed to fetch activeInbounds');
const validated = parseMsg(msg, ActiveInboundsByNodeSchema, 'clients/activeInbounds');
return (validated.obj && typeof validated.obj === 'object') ? (validated.obj as Record<string, string[]>) : {};
}
function toNodeOnlineMap(data: Record<string, string[]>): Map<number, Set<string>> {
const map = new Map<number, Set<string>>();
for (const [key, emails] of Object.entries(data)) {
@@ -112,6 +123,12 @@ export function useInbounds() {
staleTime: Infinity,
});
const activeInboundsQuery = useQuery({
queryKey: keys.clients.activeInbounds(),
queryFn: fetchActiveInboundsByNode,
staleTime: Infinity,
});
const lastOnlineQuery = useQuery({
queryKey: keys.clients.lastOnline(),
queryFn: fetchLastOnlineMap,
@@ -169,6 +186,13 @@ export function useInbounds() {
// reads this so each inbound only counts clients online on its own node.
const onlineByNodeRef = useRef<Map<number, Set<string>>>(new Map());
// Recently-active inbound tags keyed by node id. A node missing from this
// map means "no per-inbound activity reported" (e.g. remote nodes), so the
// rollup leaves that node's inbounds ungated and falls back to the email
// signal. A present node gates: a client only counts online on an inbound
// whose tag carried traffic this window.
const activeByNodeRef = useRef<Map<number, Set<string>>>(new Map());
const [lastOnlineMap, setLastOnlineMap] = useState<Record<string, number>>({});
const rollupClients = useCallback(
@@ -185,14 +209,21 @@ export function useInbounds() {
const comments = new Map<string, string>();
const now = Date.now();
const nodeOnline = onlineByNodeRef.current.get(dbInbound.nodeId ?? 0);
const nodeId = dbInbound.nodeId ?? 0;
const nodeOnline = onlineByNodeRef.current.get(nodeId);
// A node absent from the active map reports no per-inbound activity, so
// leave its inbounds ungated. When present, only mark a client online on
// this inbound if its tag actually carried traffic — that's what stops a
// multi-inbound client lighting up every inbound it's attached to.
const activeForNode = activeByNodeRef.current.get(nodeId);
const inboundActive = activeForNode === undefined || !dbInbound.tag || activeForNode.has(dbInbound.tag);
if (dbInbound.enable) {
for (const client of clients) {
if (client.comment && client.email) comments.set(client.email, client.comment);
if (client.enable) {
if (client.email) active.push(client.email);
if (client.email && nodeOnline?.has(client.email)) online.push(client.email);
if (client.email && inboundActive && nodeOnline?.has(client.email)) online.push(client.email);
} else if (client.email) {
deactive.push(client.email);
}
@@ -280,6 +311,13 @@ export function useInbounds() {
}
}, [onlinesByNodeQuery.data, rebuildClientCount]);
useEffect(() => {
if (activeInboundsQuery.data) {
activeByNodeRef.current = toNodeOnlineMap(activeInboundsQuery.data);
rebuildClientCount();
}
}, [activeInboundsQuery.data, rebuildClientCount]);
useEffect(() => {
if (lastOnlineQuery.data) setLastOnlineMap(lastOnlineQuery.data);
}, [lastOnlineQuery.data]);
@@ -299,6 +337,7 @@ export function useInbounds() {
queryClient.invalidateQueries({ queryKey: keys.inbounds.root() }),
queryClient.invalidateQueries({ queryKey: keys.clients.onlines() }),
queryClient.invalidateQueries({ queryKey: keys.clients.onlinesByNode() }),
queryClient.invalidateQueries({ queryKey: keys.clients.activeInbounds() }),
queryClient.invalidateQueries({ queryKey: keys.clients.lastOnline() }),
queryClient.invalidateQueries({ queryKey: keys.xray.config() }),
]);
@@ -328,7 +367,7 @@ export function useInbounds() {
const applyTrafficEvent = useCallback(
(payload: unknown) => {
if (!payload || typeof payload !== 'object') return;
const p = payload as { onlineClients?: string[]; onlineByNode?: Record<string, string[]>; lastOnlineMap?: Record<string, number> };
const p = payload as { onlineClients?: string[]; onlineByNode?: Record<string, string[]>; activeInbounds?: Record<string, string[]>; lastOnlineMap?: Record<string, number> };
if (Array.isArray(p.onlineClients)) {
onlineClientsRef.current = p.onlineClients;
setOnlineClients(p.onlineClients);
@@ -336,6 +375,9 @@ export function useInbounds() {
if (p.onlineByNode && typeof p.onlineByNode === 'object') {
onlineByNodeRef.current = toNodeOnlineMap(p.onlineByNode);
}
if (p.activeInbounds && typeof p.activeInbounds === 'object') {
activeByNodeRef.current = toNodeOnlineMap(p.activeInbounds);
}
if (p.lastOnlineMap && typeof p.lastOnlineMap === 'object') {
setLastOnlineMap((prev) => ({ ...prev, ...p.lastOnlineMap! }));
}

View File

@@ -117,6 +117,11 @@ export const OnlineByNodeSchema = z
.nullable()
.transform((v) => v ?? {});
export const ActiveInboundsByNodeSchema = z
.record(z.string(), nullableStringArray)
.nullable()
.transform((v) => v ?? {});
export const GroupSummarySchema = z.object({
name: z.string(),
clientCount: z.number(),

View File

@@ -56,6 +56,7 @@ func (a *ClientController) initRouter(g *gin.RouterGroup) {
g.POST("/clearIps/:email", a.clearIps)
g.POST("/onlines", a.onlines)
g.POST("/onlinesByNode", a.onlinesByNode)
g.POST("/activeInbounds", a.activeInbounds)
g.POST("/lastOnline", a.lastOnline)
}
@@ -402,6 +403,10 @@ func (a *ClientController) onlinesByNode(c *gin.Context) {
jsonObj(c, a.inboundService.GetOnlineClientsByNode(), nil)
}
func (a *ClientController) activeInbounds(c *gin.Context) {
jsonObj(c, a.inboundService.GetActiveInboundsByNode(), nil)
}
func (a *ClientController) lastOnline(c *gin.Context) {
data, err := a.inboundService.GetClientsLastOnline()
jsonObj(c, data, err)

View File

@@ -109,10 +109,10 @@ func (j *NodeTrafficSyncJob) Run() {
lastOnline = map[string]int64{}
}
// Prune stale local-online entries (no local active emails to add here —
// only the local xray poll feeds those) so a stopped local xray's clients
// still age out between traffic polls.
j.inboundService.RefreshLocalOnlineClients(nil)
// Prune stale local-online entries (no local active emails or inbound tags
// to add here — only the local xray poll feeds those) so a stopped local
// xray's clients and inbounds still age out between traffic polls.
j.inboundService.RefreshLocalOnlineClients(nil, nil)
if !websocket.HasClients() {
return
@@ -123,9 +123,10 @@ func (j *NodeTrafficSyncJob) Run() {
online = []string{}
}
websocket.BroadcastTraffic(map[string]any{
"onlineClients": online,
"onlineByNode": j.inboundService.GetOnlineClientsByNode(),
"lastOnlineMap": lastOnline,
"onlineClients": online,
"onlineByNode": j.inboundService.GetOnlineClientsByNode(),
"activeInbounds": j.inboundService.GetActiveInboundsByNode(),
"lastOnlineMap": lastOnline,
})
clientStats := map[string]any{}

View File

@@ -82,7 +82,18 @@ func (j *XrayTrafficJob) Run() {
activeEmails = append(activeEmails, ct.Email)
}
}
j.inboundService.RefreshLocalOnlineClients(activeEmails)
// Pair the email signal with the inbound tags that moved bytes this poll.
// Xray's user>>>email counter aggregates across every inbound a client is
// attached to, so an online email alone can't say which inbound it used —
// gating the per-inbound view on these tags keeps a multi-inbound client
// off inbounds that saw no traffic. See issue #4859.
activeInboundTags := make([]string, 0, len(traffics))
for _, tr := range traffics {
if tr != nil && tr.IsInbound && tr.Up+tr.Down > 0 {
activeInboundTags = append(activeInboundTags, tr.Tag)
}
}
j.inboundService.RefreshLocalOnlineClients(activeEmails, activeInboundTags)
if !websocket.HasClients() {
return
@@ -97,6 +108,7 @@ func (j *XrayTrafficJob) Run() {
"clientTraffics": clientTraffics,
"onlineClients": onlineClients,
"onlineByNode": j.inboundService.GetOnlineClientsByNode(),
"activeInbounds": j.inboundService.GetActiveInboundsByNode(),
"lastOnlineMap": lastOnlineMap,
})

View File

@@ -3339,6 +3339,13 @@ func (s *InboundService) GetOnlineClientsByNode() map[int][]string {
return p.GetOnlineClientsByNode()
}
func (s *InboundService) GetActiveInboundsByNode() map[int][]string {
if p == nil {
return map[int][]string{}
}
return p.GetActiveInboundsByNode()
}
func (s *InboundService) SetNodeOnlineClients(nodeID int, emails []string) {
if p != nil {
p.SetNodeOnlineClients(nodeID, emails)
@@ -3365,13 +3372,14 @@ func (s *InboundService) GetClientsLastOnline() (map[string]int64, error) {
return result, nil
}
// RefreshLocalOnlineClients folds the emails active on this panel's own
// xray this poll into the local online set, applying the grace window and
// pruning stale entries. Pass nil to only prune. See xray.Process for why
// the local set is kept separate from the shared last_online column.
func (s *InboundService) RefreshLocalOnlineClients(activeEmails []string) {
// RefreshLocalOnlineClients folds the emails and inbound tags active on this
// panel's own xray this poll into the local online/active sets, applying the
// grace window and pruning stale entries. Pass nil to only prune. See
// xray.Process for why the local sets are kept separate from the shared
// last_online column.
func (s *InboundService) RefreshLocalOnlineClients(activeEmails, activeInboundTags []string) {
if p != nil {
p.RefreshLocalOnline(activeEmails, time.Now().UnixMilli(), onlineGracePeriodMs)
p.RefreshLocalOnline(activeEmails, activeInboundTags, time.Now().UnixMilli(), onlineGracePeriodMs)
}
}

View File

@@ -24,7 +24,7 @@ func assertSameSet(t *testing.T, label string, got, want []string) {
// client online on one node must not be reported online on any other node.
func TestGetOnlineClientsByNodeScopesPerNode(t *testing.T) {
p := newOnlineTestProcess()
p.RefreshLocalOnline([]string{"user1"}, 1000, 20000)
p.RefreshLocalOnline([]string{"user1"}, nil, 1000, 20000)
p.SetNodeOnlineClients(3, []string{"user1", "user2"})
p.SetNodeOnlineClients(5, []string{"user3"})
@@ -63,7 +63,7 @@ func TestGetOnlineClientsByNodeOmitsEmptyGroups(t *testing.T) {
// client-centric / total-count views) still merges every node and dedupes.
func TestGetOnlineClientsUnionDedupes(t *testing.T) {
p := newOnlineTestProcess()
p.RefreshLocalOnline([]string{"user1"}, 1000, 20000)
p.RefreshLocalOnline([]string{"user1"}, nil, 1000, 20000)
p.SetNodeOnlineClients(3, []string{"user1", "user2"})
assertSameSet(t, "union", p.GetOnlineClients(), []string{"user1", "user2"})
@@ -76,18 +76,18 @@ func TestRefreshLocalOnlineGraceWindow(t *testing.T) {
p := newOnlineTestProcess()
const grace = 20000
p.RefreshLocalOnline([]string{"user1"}, 1000, grace)
p.RefreshLocalOnline([]string{"user1"}, nil, 1000, grace)
if got := p.GetOnlineClientsByNode()[localNodeKey]; !slices.Contains(got, "user1") {
t.Fatalf("user1 should be online right after activity, got %v", got)
}
p.RefreshLocalOnline([]string{"user2"}, 11000, grace)
p.RefreshLocalOnline([]string{"user2"}, nil, 11000, grace)
got := p.GetOnlineClientsByNode()[localNodeKey]
if !slices.Contains(got, "user1") || !slices.Contains(got, "user2") {
t.Fatalf("both within grace window, got %v", got)
}
p.RefreshLocalOnline(nil, 22000, grace)
p.RefreshLocalOnline(nil, nil, 22000, grace)
got = p.GetOnlineClientsByNode()[localNodeKey]
if slices.Contains(got, "user1") {
t.Errorf("user1 (idle 21s, past grace) should have aged out, got %v", got)
@@ -97,6 +97,32 @@ func TestRefreshLocalOnlineGraceWindow(t *testing.T) {
}
}
// TestGetActiveInboundsByNodeTracksGraceWindow pins the fix for issue #4859: a
// multi-inbound client must only count as online on inbounds that actually
// carried traffic. The active-inbound signal honours the same grace window as
// the online-email signal, and only this panel's tags report under key 0.
func TestGetActiveInboundsByNodeTracksGraceWindow(t *testing.T) {
p := newOnlineTestProcess()
const grace = 20000
p.RefreshLocalOnline([]string{"alice"}, []string{"inbound-a"}, 1000, grace)
got := p.GetActiveInboundsByNode()[localNodeKey]
assertSameSet(t, "active after first poll", got, []string{"inbound-a"})
p.RefreshLocalOnline([]string{"alice"}, []string{"inbound-b"}, 11000, grace)
got = p.GetActiveInboundsByNode()[localNodeKey]
assertSameSet(t, "both within grace", got, []string{"inbound-a", "inbound-b"})
p.RefreshLocalOnline(nil, nil, 22000, grace)
got = p.GetActiveInboundsByNode()[localNodeKey]
assertSameSet(t, "inbound-a (idle 21s, past grace) aged out, inbound-b kept", got, []string{"inbound-b"})
p.RefreshLocalOnline(nil, nil, 40000, grace)
if _, ok := p.GetActiveInboundsByNode()[localNodeKey]; ok {
t.Errorf("all inbounds idle past grace, key 0 should be absent: %v", p.GetActiveInboundsByNode())
}
}
// TestClearNodeOnlineClientsDropsNode mirrors a failed node probe: the node's
// clients must disappear from the per-node map immediately.
func TestClearNodeOnlineClientsDropsNode(t *testing.T) {

View File

@@ -135,6 +135,13 @@ type process struct {
// snapshots — so a client connected solely to a remote node is not
// reported online on local inbounds.
onlineClients []string
// localActiveInbounds is the set of THIS panel's inbound tags that
// carried traffic within the same grace window. Xray's user>>>email
// stat aggregates across every inbound a client is attached to, so an
// online email alone can't say which inbound it actually used. Pairing
// it with the inbound>>>tag stat lets the per-inbound view drop a
// multi-inbound client from inbounds that saw no traffic this window.
localActiveInbounds []string
// localLastOnline records, per email, the last time this panel's own
// xray reported traffic for it. RefreshLocalOnline rebuilds
// onlineClients from this map each tick, keeping the local online set
@@ -142,6 +149,12 @@ type process struct {
// column is bumped by remote-node syncs too and would otherwise leak
// remote-only clients into the local set.
localLastOnline map[string]int64
// localInboundLastActive mirrors localLastOnline for inbound tags: the
// last tick this panel's xray reported traffic through each tag.
// Rebuilt into localActiveInbounds under the same grace window so the
// two signals stay aligned — an email within grace always has the
// inbound it used within grace too.
localInboundLastActive map[string]int64
// nodeOnlineClients holds the online-emails list reported by each
// remote node, keyed by node id. NodeTrafficSyncJob populates entries
// per cron tick and clears them when a node's probe fails. The mutex
@@ -296,13 +309,33 @@ func (p *Process) GetOnlineClientsByNode() map[int][]string {
return out
}
// RefreshLocalOnline records that each email in activeEmails had local xray
// traffic at now, then rebuilds onlineClients from every email seen within
// graceMs and prunes entries older than that. Called by the local
// XrayTrafficJob after each xray gRPC stats poll. Pass a nil/empty
// activeEmails to only prune — NodeTrafficSyncJob does this so a stopped
// local xray's clients still age out between local traffic polls.
func (p *Process) RefreshLocalOnline(activeEmails []string, now, graceMs int64) {
// GetActiveInboundsByNode returns the inbound tags that carried traffic within
// the grace window, grouped by node. Only this panel's own xray reports
// per-inbound activity (under localNodeKey); remote-node snapshots don't carry
// it, so their nodes are simply absent — the per-inbound view reads "node
// missing" as "don't gate" and falls back to the email-only signal there.
// Empty groups are omitted, mirroring GetOnlineClientsByNode.
func (p *Process) GetActiveInboundsByNode() map[int][]string {
p.onlineMu.RLock()
defer p.onlineMu.RUnlock()
if len(p.localActiveInbounds) == 0 {
return map[int][]string{}
}
out := make(map[int][]string, 1)
local := make([]string, len(p.localActiveInbounds))
copy(local, p.localActiveInbounds)
out[localNodeKey] = local
return out
}
// RefreshLocalOnline records that each email in activeEmails and each tag in
// activeInboundTags had local xray traffic at now, then rebuilds onlineClients
// and localActiveInbounds from every entry seen within graceMs, pruning older
// ones. Called by the local XrayTrafficJob after each xray gRPC stats poll.
// Pass nil/empty slices to only prune — NodeTrafficSyncJob does this so a
// stopped local xray's clients and inbounds still age out between local polls.
func (p *Process) RefreshLocalOnline(activeEmails, activeInboundTags []string, now, graceMs int64) {
p.onlineMu.Lock()
defer p.onlineMu.Unlock()
if p.localLastOnline == nil {
@@ -320,6 +353,22 @@ func (p *Process) RefreshLocalOnline(activeEmails []string, now, graceMs int64)
}
}
p.onlineClients = online
if p.localInboundLastActive == nil {
p.localInboundLastActive = make(map[string]int64, len(activeInboundTags))
}
for _, tag := range activeInboundTags {
p.localInboundLastActive[tag] = now
}
activeInbounds := make([]string, 0, len(p.localInboundLastActive))
for tag, ts := range p.localInboundLastActive {
if now-ts < graceMs {
activeInbounds = append(activeInbounds, tag)
} else {
delete(p.localInboundLastActive, tag)
}
}
p.localActiveInbounds = activeInbounds
}
// SetNodeOnlineClients records the online-emails set for one remote