fix(node): keep client/inbound edits working when a node is offline (#4923, #4931)

Node-backed client and inbound edits no longer hard-fail when the backing node is offline or disabled. Edits commit to the panel DB immediately and reconcile to the node when it reconnects (eventual consistency); the panel is the single source of truth for desired config.

- Add Node.ConfigDirty/ConfigDirtyAt; mark a node dirty when an edit commits without reaching it (cleared via CAS on ConfigDirtyAt after a full reconcile).
- nodePushPlan() reads node state fresh from the DB, skips the push for offline/disabled nodes (no 10s hang), and treats push failures as non-fatal across every mutation path (client add/update/del + bulk + attach/detach; inbound add/update/del/toggle/resetTraffic).
- ReconcileNode() pushes the panel's desired config to a node on reconnect (refreshing the remote tag cache first) and prunes node-side orphans; runs before the traffic pull in the node sync job.
- While a node is dirty the traffic pull applies only up/down deltas and node-initiated disables, never overwriting desired config from a stale node snapshot.
- Surface a non-blocking 'saved; will sync on reconnect' warning to the UI.

Validated with a two-panel Docker E2E: client delete/update, attach/detach, and inbound add/delete all reconcile correctly offline -> reconnect.
This commit is contained in:
MHSanaei
2026-06-05 02:26:57 +02:00
parent e08456269b
commit b40f869f2a
16 changed files with 674 additions and 220 deletions

View File

@@ -396,6 +396,9 @@ type Node struct {
UptimeSecs uint64 `json:"uptimeSecs"`
LastError string `json:"lastError"`
ConfigDirty bool `json:"configDirty" gorm:"default:false"`
ConfigDirtyAt int64 `json:"configDirtyAt"`
InboundCount int `json:"inboundCount" gorm:"-"`
ClientCount int `json:"clientCount" gorm:"-"`
OnlineCount int `json:"onlineCount" gorm:"-"`

View File

@@ -322,6 +322,8 @@ export interface Node {
apiToken: string;
basePath: string;
clientCount: number;
configDirty: boolean;
configDirtyAt: number;
cpuPct: number;
createdAt: number;
depletedCount: number;

View File

@@ -339,6 +339,8 @@ export const NodeSchema = z.object({
apiToken: z.string(),
basePath: z.string(),
clientCount: z.number().int(),
configDirty: z.boolean(),
configDirtyAt: z.number().int(),
cpuPct: z.number(),
createdAt: z.number().int(),
depletedCount: z.number().int(),

View File

@@ -1,5 +1,6 @@
import axios from 'axios';
import type { AxiosError, AxiosRequestConfig, AxiosResponse } from 'axios';
import i18next from 'i18next';
import { getMessage } from './messageBus';
type RespEnvelope = { success?: unknown; msg?: unknown; obj?: unknown };
@@ -32,6 +33,14 @@ export class HttpUtil {
}
const messageType = msg.success ? 'success' : 'error';
getMessage()[messageType](msg.msg);
if (
msg.success &&
msg.obj &&
typeof msg.obj === 'object' &&
(msg.obj as { nodePending?: unknown }).nodePending === true
) {
getMessage().warning(i18next.t('pages.inbounds.toasts.savedNodeOfflineWillSync'));
}
}
static _respToMsg(resp: AxiosResponse | undefined): Msg {

View File

@@ -132,7 +132,7 @@ func (a *ClientController) create(c *gin.Context) {
jsonMsg(c, I18nWeb(c, "somethingWentWrong"), err)
return
}
jsonMsg(c, I18nWeb(c, "pages.inbounds.toasts.inboundClientAddSuccess"), nil)
jsonMsgObj(c, I18nWeb(c, "pages.inbounds.toasts.inboundClientAddSuccess"), pendingNodeObj(a.inboundService.AnyNodePending(payload.InboundIds)), nil)
if needRestart {
a.xrayService.SetToNeedRestart()
}
@@ -152,7 +152,7 @@ func (a *ClientController) update(c *gin.Context) {
jsonMsg(c, I18nWeb(c, "somethingWentWrong"), err)
return
}
jsonMsg(c, I18nWeb(c, "pages.inbounds.toasts.inboundClientUpdateSuccess"), nil)
jsonMsgObj(c, I18nWeb(c, "pages.inbounds.toasts.inboundClientUpdateSuccess"), pendingNodeObj(a.clientService.HasPendingNode(&a.inboundService, email)), nil)
if needRestart {
a.xrayService.SetToNeedRestart()
}
@@ -190,7 +190,7 @@ func (a *ClientController) attach(c *gin.Context) {
jsonMsg(c, I18nWeb(c, "somethingWentWrong"), err)
return
}
jsonMsg(c, I18nWeb(c, "pages.inbounds.toasts.inboundClientAddSuccess"), nil)
jsonMsgObj(c, I18nWeb(c, "pages.inbounds.toasts.inboundClientAddSuccess"), pendingNodeObj(a.inboundService.AnyNodePending(body.InboundIds)), nil)
if needRestart {
a.xrayService.SetToNeedRestart()
}
@@ -470,7 +470,7 @@ func (a *ClientController) detach(c *gin.Context) {
jsonMsg(c, I18nWeb(c, "somethingWentWrong"), err)
return
}
jsonMsg(c, I18nWeb(c, "pages.inbounds.toasts.inboundClientDeleteSuccess"), nil)
jsonMsgObj(c, I18nWeb(c, "pages.inbounds.toasts.inboundClientDeleteSuccess"), pendingNodeObj(a.inboundService.AnyNodePending(body.InboundIds)), nil)
if needRestart {
a.xrayService.SetToNeedRestart()
}

View File

@@ -182,6 +182,16 @@ func jsonMsgObj(c *gin.Context, msg string, obj any, err error) {
c.JSON(http.StatusOK, m)
}
// pendingNodeObj returns a response object flagging that the save committed
// locally but a backing node was offline/disabled, so the change will be
// mirrored to the node once it reconnects. Returns nil when nothing is pending.
func pendingNodeObj(pending bool) any {
if pending {
return gin.H{"nodePending": true}
}
return nil
}
// pureJsonMsg sends a pure JSON message response with custom status code.
func pureJsonMsg(c *gin.Context, statusCode int, success bool, msg string) {
c.JSON(statusCode, entity.Msg{

View File

@@ -15,6 +15,7 @@ import (
const (
nodeTrafficSyncConcurrency = 8
nodeTrafficSyncRequestTimeout = 4 * time.Second
nodeReconcileTimeout = 30 * time.Second
)
type NodeTrafficSyncJob struct {
@@ -151,21 +152,37 @@ func (j *NodeTrafficSyncJob) Run() {
}
func (j *NodeTrafficSyncJob) syncOne(mgr *runtime.Manager, n *model.Node) {
ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout)
defer cancel()
rt, err := mgr.RemoteFor(n)
if err != nil {
logger.Warning("node traffic sync: remote lookup failed for", n.Name, ":", err)
return
}
if n.ConfigDirty {
reconcileCtx, reconcileCancel := context.WithTimeout(context.Background(), nodeReconcileTimeout)
reconcileErr := j.inboundService.ReconcileNode(reconcileCtx, rt, n.Id)
reconcileCancel()
if reconcileErr != nil {
logger.Warning("node traffic sync: reconcile for", n.Name, "failed:", reconcileErr)
return
}
if clearErr := j.nodeService.ClearNodeDirty(n.Id, n.ConfigDirtyAt); clearErr != nil {
logger.Warning("node traffic sync: clear dirty for", n.Name, "failed:", clearErr)
}
j.structural.set()
}
ctx, cancel := context.WithTimeout(context.Background(), nodeTrafficSyncRequestTimeout)
defer cancel()
snap, err := rt.FetchTrafficSnapshot(ctx)
if err != nil {
logger.Warning("node traffic sync: fetch from", n.Name, "failed:", err)
j.inboundService.ClearNodeOnlineClients(n.Id)
return
}
changed, err := j.inboundService.SetRemoteTraffic(n.Id, snap)
_, _, dirty, _, _ := j.nodeService.NodeSyncState(n.Id)
changed, err := j.inboundService.SetRemoteTraffic(n.Id, snap, dirty)
if err != nil {
logger.Warning("node traffic sync: merge for", n.Name, "failed:", err)
return

View File

@@ -191,6 +191,19 @@ func (r *Remote) cacheDel(tag string) {
delete(r.remoteIDByTag, tag)
}
func (r *Remote) ListRemoteTags(ctx context.Context) ([]string, error) {
if err := r.refreshRemoteIDs(ctx); err != nil {
return nil, err
}
r.mu.RLock()
defer r.mu.RUnlock()
tags := make([]string, 0, len(r.remoteIDByTag))
for tag := range r.remoteIDByTag {
tags = append(tags, tag)
}
return tags, nil
}
func (r *Remote) refreshRemoteIDs(ctx context.Context) error {
env, err := r.do(ctx, http.MethodGet, "panel/api/inbounds/list", nil)
if err != nil {

View File

@@ -547,6 +547,17 @@ func validateClientSubID(subID string) error {
return nil
}
func (s *ClientService) HasPendingNode(inboundSvc *InboundService, email string) bool {
if strings.TrimSpace(email) == "" {
return false
}
ids, err := s.GetInboundIdsForEmail(nil, email)
if err != nil {
return false
}
return inboundSvc.AnyNodePending(ids)
}
func (s *ClientService) Create(inboundSvc *InboundService, payload *ClientCreatePayload) (bool, error) {
if payload == nil {
return false, common.NewError("empty payload")
@@ -1290,6 +1301,7 @@ func (s *ClientService) delInboundClients(inboundSvc *InboundService, inboundId
}
needRestart := false
markDirty := false
for _, r := range removed {
email := r.email
emailShared := sharedSet[strings.ToLower(strings.TrimSpace(email))]
@@ -1324,12 +1336,18 @@ func (s *ClientService) delInboundClients(inboundSvc *InboundService, inboundId
}
}
if oldInbound.NodeID != nil && len(email) > 0 {
rt, rterr := inboundSvc.runtimeFor(oldInbound)
if rterr != nil {
return needRestart, rterr
rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
if perr != nil {
return needRestart, perr
}
if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil {
return needRestart, err1
if dirty {
markDirty = true
}
if push {
if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil {
logger.Warning("Error in deleting client on", rt.Name(), ":", err1)
markDirty = true
}
}
}
}
@@ -1344,6 +1362,11 @@ func (s *ClientService) delInboundClients(inboundSvc *InboundService, inboundId
if err := s.SyncInbound(db, inboundId, finalClients); err != nil {
return needRestart, err
}
if markDirty && oldInbound.NodeID != nil {
if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
logger.Warning("mark node dirty failed:", dErr)
}
}
return needRestart, nil
}
@@ -2722,27 +2745,33 @@ func (s *ClientService) bulkAdjustInboundClients(
}
oldInbound.Settings = string(newSettings)
markDirty := false
if oldInbound.NodeID != nil {
rt, rterr := inboundSvc.runtimeFor(oldInbound)
if rterr != nil {
rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
if perr != nil {
for email := range foundEmails {
res.perEmailSkipped[email] = rterr.Error()
res.perEmailSkipped[email] = perr.Error()
delete(foundEmails, email)
}
} else {
for email := range foundEmails {
entry := plan[email]
updated := *entry.record.ToClient()
if entry.applyExpiry {
updated.ExpiryTime = entry.newExpiry
}
if entry.applyTotal {
updated.TotalGB = entry.newTotal
}
updated.UpdatedAt = nowMs
if err1 := rt.UpdateUser(context.Background(), oldInbound, email, updated); err1 != nil {
res.perEmailSkipped[email] = err1.Error()
delete(foundEmails, email)
if dirty {
markDirty = true
}
if push {
for email := range foundEmails {
entry := plan[email]
updated := *entry.record.ToClient()
if entry.applyExpiry {
updated.ExpiryTime = entry.newExpiry
}
if entry.applyTotal {
updated.TotalGB = entry.newTotal
}
updated.UpdatedAt = nowMs
if err1 := rt.UpdateUser(context.Background(), oldInbound, email, updated); err1 != nil {
logger.Warning("Error in updating client on", rt.Name(), ":", err1)
markDirty = true
}
}
}
}
@@ -2765,6 +2794,10 @@ func (s *ClientService) bulkAdjustInboundClients(
res.perEmailSkipped[email] = txErr.Error()
}
}
} else if markDirty && oldInbound.NodeID != nil {
if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
logger.Warning("mark node dirty failed:", dErr)
}
}
return res
@@ -3083,6 +3116,7 @@ func (s *ClientService) bulkDelInboundClients(
}
}
markDirty := false
if oldInbound.NodeID == nil {
rt, rterr := inboundSvc.runtimeFor(oldInbound)
if rterr != nil {
@@ -3104,17 +3138,22 @@ func (s *ClientService) bulkDelInboundClients(
}
}
} else {
rt, rterr := inboundSvc.runtimeFor(oldInbound)
if rterr != nil {
rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
if perr != nil {
for email := range foundEmails {
res.perEmailSkipped[email] = rterr.Error()
res.perEmailSkipped[email] = perr.Error()
delete(foundEmails, email)
}
} else {
for email := range foundEmails {
if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil {
res.perEmailSkipped[email] = err1.Error()
delete(foundEmails, email)
if dirty {
markDirty = true
}
if push {
for email := range foundEmails {
if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil {
logger.Warning("Error in deleting client on", rt.Name(), ":", err1)
markDirty = true
}
}
}
}
@@ -3136,6 +3175,10 @@ func (s *ClientService) bulkDelInboundClients(
res.perEmailSkipped[email] = txErr.Error()
}
}
} else if markDirty && oldInbound.NodeID != nil {
if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
logger.Warning("mark node dirty failed:", dErr)
}
}
return res
@@ -3608,50 +3651,61 @@ func (s *ClientService) addInboundClient(inboundSvc *InboundService, data *model
db := database.GetDB()
tx := db.Begin()
markDirty := false
defer func() {
if err != nil {
tx.Rollback()
} else {
tx.Commit()
return
}
tx.Commit()
if markDirty && oldInbound.NodeID != nil {
if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
logger.Warning("mark node dirty failed:", dErr)
}
}
}()
needRestart := false
rt, rterr := inboundSvc.runtimeFor(oldInbound)
if rterr != nil {
if oldInbound.NodeID != nil {
err = rterr
return false, err
}
needRestart = true
} else if oldInbound.NodeID == nil {
for _, client := range clients {
if len(client.Email) == 0 {
needRestart = true
continue
}
inboundSvc.AddClientStat(tx, data.Id, &client)
if !client.Enable {
continue
}
cipher := ""
if oldInbound.Protocol == "shadowsocks" {
cipher = oldSettings["method"].(string)
}
err1 := rt.AddUser(context.Background(), oldInbound, map[string]any{
"email": client.Email,
"id": client.ID,
"auth": client.Auth,
"security": client.Security,
"flow": client.Flow,
"password": client.Password,
"cipher": cipher,
})
if err1 == nil {
logger.Debug("Client added on", rt.Name(), ":", client.Email)
} else {
logger.Debug("Error in adding client on", rt.Name(), ":", err1)
needRestart = true
rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
if perr != nil {
err = perr
return false, err
}
if dirty {
markDirty = true
}
if oldInbound.NodeID == nil {
if !push {
needRestart = true
} else {
for _, client := range clients {
if len(client.Email) == 0 {
needRestart = true
continue
}
inboundSvc.AddClientStat(tx, data.Id, &client)
if !client.Enable {
continue
}
cipher := ""
if oldInbound.Protocol == "shadowsocks" {
cipher = oldSettings["method"].(string)
}
err1 := rt.AddUser(context.Background(), oldInbound, map[string]any{
"email": client.Email,
"id": client.ID,
"auth": client.Auth,
"security": client.Security,
"flow": client.Flow,
"password": client.Password,
"cipher": cipher,
})
if err1 == nil {
logger.Debug("Client added on", rt.Name(), ":", client.Email)
} else {
logger.Debug("Error in adding client on", rt.Name(), ":", err1)
needRestart = true
}
}
}
} else {
@@ -3659,9 +3713,12 @@ func (s *ClientService) addInboundClient(inboundSvc *InboundService, data *model
if len(client.Email) > 0 {
inboundSvc.AddClientStat(tx, data.Id, &client)
}
if err1 := rt.AddClient(context.Background(), oldInbound, client); err1 != nil {
err = err1
return false, err
if push {
if err1 := rt.AddClient(context.Background(), oldInbound, client); err1 != nil {
logger.Warning("Error in adding client on", rt.Name(), ":", err1)
markDirty = true
push = false
}
}
}
}
@@ -3839,11 +3896,17 @@ func (s *ClientService) UpdateInboundClient(inboundSvc *InboundService, data *mo
db := database.GetDB()
tx := db.Begin()
markDirty := false
defer func() {
if err != nil {
tx.Rollback()
} else {
tx.Commit()
return
}
tx.Commit()
if markDirty && oldInbound.NodeID != nil {
if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
logger.Warning("mark node dirty failed:", dErr)
}
}
}()
@@ -3903,50 +3966,55 @@ func (s *ClientService) UpdateInboundClient(inboundSvc *InboundService, data *mo
}
needRestart := false
if len(oldEmail) > 0 {
rt, rterr := inboundSvc.runtimeFor(oldInbound)
if rterr != nil {
if oldInbound.NodeID != nil {
err = rterr
return false, err
}
needRestart = true
} else if oldInbound.NodeID == nil {
if oldClients[clientIndex].Enable {
err1 := rt.RemoveUser(context.Background(), oldInbound, oldEmail)
if err1 == nil {
logger.Debug("Old client deleted on", rt.Name(), ":", oldEmail)
} else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", oldEmail)) {
logger.Debug("User is already deleted. Nothing to do more...")
} else {
logger.Debug("Error in deleting client on", rt.Name(), ":", err1)
needRestart = true
rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
if perr != nil {
err = perr
return false, err
}
if dirty {
markDirty = true
}
if oldInbound.NodeID == nil {
if !push {
needRestart = true
} else {
if oldClients[clientIndex].Enable {
err1 := rt.RemoveUser(context.Background(), oldInbound, oldEmail)
if err1 == nil {
logger.Debug("Old client deleted on", rt.Name(), ":", oldEmail)
} else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", oldEmail)) {
logger.Debug("User is already deleted. Nothing to do more...")
} else {
logger.Debug("Error in deleting client on", rt.Name(), ":", err1)
needRestart = true
}
}
if clients[0].Enable {
cipher := ""
if oldInbound.Protocol == "shadowsocks" {
cipher = oldSettings["method"].(string)
}
err1 := rt.AddUser(context.Background(), oldInbound, map[string]any{
"email": clients[0].Email,
"id": clients[0].ID,
"security": clients[0].Security,
"flow": clients[0].Flow,
"auth": clients[0].Auth,
"password": clients[0].Password,
"cipher": cipher,
})
if err1 == nil {
logger.Debug("Client edited on", rt.Name(), ":", clients[0].Email)
} else {
logger.Debug("Error in adding client on", rt.Name(), ":", err1)
needRestart = true
}
}
}
if clients[0].Enable {
cipher := ""
if oldInbound.Protocol == "shadowsocks" {
cipher = oldSettings["method"].(string)
}
err1 := rt.AddUser(context.Background(), oldInbound, map[string]any{
"email": clients[0].Email,
"id": clients[0].ID,
"security": clients[0].Security,
"flow": clients[0].Flow,
"auth": clients[0].Auth,
"password": clients[0].Password,
"cipher": cipher,
})
if err1 == nil {
logger.Debug("Client edited on", rt.Name(), ":", clients[0].Email)
} else {
logger.Debug("Error in adding client on", rt.Name(), ":", err1)
needRestart = true
}
}
} else {
} else if push {
if err1 := rt.UpdateUser(context.Background(), oldInbound, oldEmail, clients[0]); err1 != nil {
err = err1
return false, err
logger.Warning("Error in updating client on", rt.Name(), ":", err1)
markDirty = true
}
}
} else {
@@ -4038,6 +4106,7 @@ func (s *ClientService) DelInboundClient(inboundSvc *InboundService, inboundId i
}
}
needRestart := false
markDirty := false
if len(email) > 0 {
var enables []bool
@@ -4073,12 +4142,18 @@ func (s *ClientService) DelInboundClient(inboundSvc *InboundService, inboundId i
}
}
if oldInbound.NodeID != nil && len(email) > 0 {
rt, rterr := inboundSvc.runtimeFor(oldInbound)
if rterr != nil {
return false, rterr
rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
if perr != nil {
return false, perr
}
if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil {
return false, err1
if dirty {
markDirty = true
}
if push {
if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil {
logger.Warning("Error in deleting client on", rt.Name(), ":", err1)
markDirty = true
}
}
}
if err := db.Save(oldInbound).Error; err != nil {
@@ -4091,6 +4166,11 @@ func (s *ClientService) DelInboundClient(inboundSvc *InboundService, inboundId i
if err := s.SyncInbound(db, inboundId, finalClients); err != nil {
return false, err
}
if markDirty && oldInbound.NodeID != nil {
if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
logger.Warning("mark node dirty failed:", dErr)
}
}
return needRestart, nil
}
@@ -4159,6 +4239,7 @@ func (s *ClientService) DelInboundClientByEmail(inboundSvc *InboundService, inbo
}
needRestart := false
markDirty := false
if len(email) > 0 && !emailShared {
if !keepTraffic {
@@ -4175,25 +4256,29 @@ func (s *ClientService) DelInboundClientByEmail(inboundSvc *InboundService, inbo
}
if needApiDel {
rt, rterr := inboundSvc.runtimeFor(oldInbound)
if rterr != nil {
if oldInbound.NodeID != nil {
return false, rterr
}
needRestart = true
} else if oldInbound.NodeID == nil {
if err1 := rt.RemoveUser(context.Background(), oldInbound, email); err1 == nil {
rt, push, dirty, perr := inboundSvc.nodePushPlan(oldInbound)
if perr != nil {
return false, perr
}
if dirty {
markDirty = true
}
if oldInbound.NodeID == nil {
if !push {
needRestart = true
} else if err1 := rt.RemoveUser(context.Background(), oldInbound, email); err1 == nil {
logger.Debug("Client deleted on", rt.Name(), ":", email)
needRestart = false
} else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", email)) {
logger.Debug("User is already deleted. Nothing to do more...")
} else {
logger.Debug("Error in deleting client on", rt.Name(), ":", err1)
logger.Debug("Error in deleting client on", rt.Name(), ":", email)
needRestart = true
}
} else {
} else if push {
if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil {
return false, err1
logger.Warning("Error in deleting client on", rt.Name(), ":", err1)
markDirty = true
}
}
}
@@ -4209,6 +4294,11 @@ func (s *ClientService) DelInboundClientByEmail(inboundSvc *InboundService, inbo
if err := s.SyncInbound(db, inboundId, finalClients); err != nil {
return false, err
}
if markDirty && oldInbound.NodeID != nil {
if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
logger.Warning("mark node dirty failed:", dErr)
}
}
return needRestart, nil
}

View File

@@ -62,7 +62,7 @@ func TestSetRemoteTraffic_PreservesPanelLocalGroupAndComment(t *testing.T) {
}
svc := InboundService{}
if _, err := svc.setRemoteTrafficLocked(nodeID, snap); err != nil {
if _, err := svc.setRemoteTrafficLocked(nodeID, snap, false); err != nil {
t.Fatalf("setRemoteTrafficLocked: %v", err)
}

View File

@@ -41,6 +41,92 @@ func (s *InboundService) runtimeFor(ib *model.Inbound) (runtime.Runtime, error)
return mgr.RuntimeFor(ib.NodeID)
}
func (s *InboundService) nodePushPlan(ib *model.Inbound) (runtime.Runtime, bool, bool, error) {
if ib.NodeID == nil {
rt, err := s.runtimeFor(ib)
if err != nil {
return nil, false, false, nil
}
return rt, true, false, nil
}
nodeSvc := NodeService{}
enabled, status, _, _, err := nodeSvc.NodeSyncState(*ib.NodeID)
if err != nil {
return nil, false, false, err
}
if !enabled || status == "offline" {
return nil, false, true, nil
}
rt, err := s.runtimeFor(ib)
if err != nil {
return nil, false, true, nil
}
return rt, true, false, nil
}
func (s *InboundService) NodeIsPending(nodeID *int) bool {
if nodeID == nil {
return false
}
return (&NodeService{}).IsNodePending(*nodeID)
}
func (s *InboundService) AnyNodePending(inboundIds []int) bool {
if len(inboundIds) == 0 {
return false
}
nodeSvc := NodeService{}
for _, id := range inboundIds {
ib, err := s.GetInbound(id)
if err != nil || ib.NodeID == nil {
continue
}
if nodeSvc.IsNodePending(*ib.NodeID) {
return true
}
}
return false
}
func (s *InboundService) ReconcileNode(ctx context.Context, rt *runtime.Remote, nodeID int) error {
if rt == nil || nodeID <= 0 {
return nil
}
db := database.GetDB()
var inbounds []*model.Inbound
if err := db.Model(model.Inbound{}).Where("node_id = ?", nodeID).Find(&inbounds).Error; err != nil {
return err
}
remoteTags, err := rt.ListRemoteTags(ctx)
if err != nil {
return err
}
prefix := nodeTagPrefix(&nodeID)
desiredTags := make(map[string]struct{}, len(inbounds)*2)
for _, ib := range inbounds {
desiredTags[ib.Tag] = struct{}{}
if prefix != "" {
if stripped, found := strings.CutPrefix(ib.Tag, prefix); found {
desiredTags[stripped] = struct{}{}
} else {
desiredTags[prefix+ib.Tag] = struct{}{}
}
}
if err := rt.UpdateInbound(ctx, ib, ib); err != nil {
return fmt.Errorf("reconcile inbound %q: %w", ib.Tag, err)
}
}
for _, tag := range remoteTags {
if _, want := desiredTags[tag]; want {
continue
}
if err := rt.DelInbound(ctx, &model.Inbound{Tag: tag}); err != nil {
return fmt.Errorf("reconcile delete %q: %w", tag, err)
}
}
return nil
}
type CopyClientsResult struct {
Added []string `json:"added"`
Skipped []string `json:"skipped"`
@@ -575,11 +661,17 @@ func (s *InboundService) AddInbound(inbound *model.Inbound) (*model.Inbound, boo
db := database.GetDB()
tx := db.Begin()
markDirty := false
defer func() {
if err == nil {
tx.Commit()
} else {
if err != nil {
tx.Rollback()
return
}
tx.Commit()
if markDirty && inbound.NodeID != nil {
if dErr := (&NodeService{}).MarkNodeDirty(*inbound.NodeID); dErr != nil {
logger.Warning("mark node dirty failed:", dErr)
}
}
}()
@@ -600,20 +692,25 @@ func (s *InboundService) AddInbound(inbound *model.Inbound) (*model.Inbound, boo
needRestart := false
if inbound.Enable {
rt, rterr := s.runtimeFor(inbound)
if rterr != nil {
err = rterr
rt, push, dirty, perr := s.nodePushPlan(inbound)
if perr != nil {
err = perr
return inbound, false, err
}
if err1 := rt.AddInbound(context.Background(), inbound); err1 == nil {
logger.Debug("New inbound added on", rt.Name(), ":", inbound.Tag)
} else {
logger.Debug("Unable to add inbound on", rt.Name(), ":", err1)
if inbound.NodeID != nil {
err = err1
return inbound, false, err
if dirty {
markDirty = true
}
if push {
if err1 := rt.AddInbound(context.Background(), inbound); err1 == nil {
logger.Debug("New inbound added on", rt.Name(), ":", inbound.Tag)
} else {
logger.Debug("Unable to add inbound on", rt.Name(), ":", err1)
if inbound.NodeID != nil {
markDirty = true
} else {
needRestart = true
}
}
needRestart = true
}
}
@@ -624,24 +721,31 @@ func (s *InboundService) DelInbound(id int) (bool, error) {
db := database.GetDB()
needRestart := false
markDirty := false
var ib model.Inbound
loadErr := db.Model(model.Inbound{}).Where("id = ?", id).First(&ib).Error
if loadErr == nil {
shouldPushToRuntime := ib.NodeID != nil || ib.Enable
if shouldPushToRuntime {
rt, rterr := s.runtimeFor(&ib)
if rterr != nil {
logger.Warning("DelInbound: runtime lookup failed, deleting central row anyway:", rterr)
if ib.NodeID == nil {
needRestart = true
}
} else if err1 := rt.DelInbound(context.Background(), &ib); err1 == nil {
logger.Debug("Inbound deleted on", rt.Name(), ":", ib.Tag)
} else {
logger.Warning("DelInbound on", rt.Name(), "failed, deleting central row anyway:", err1)
if ib.NodeID == nil {
needRestart = true
rt, push, dirty, perr := s.nodePushPlan(&ib)
if perr != nil {
logger.Warning("DelInbound: node lookup failed, deleting central row anyway:", perr)
markDirty = true
} else if push {
if err1 := rt.DelInbound(context.Background(), &ib); err1 == nil {
logger.Debug("Inbound deleted on", rt.Name(), ":", ib.Tag)
} else {
logger.Warning("DelInbound on", rt.Name(), "failed, deleting central row anyway:", err1)
if ib.NodeID == nil {
needRestart = true
} else {
markDirty = true
}
}
} else if ib.NodeID == nil {
needRestart = true
} else if dirty {
markDirty = true
}
} else {
logger.Debug("DelInbound: skipping runtime push for disabled local inbound id:", id)
@@ -657,6 +761,11 @@ func (s *InboundService) DelInbound(id int) (bool, error) {
if err := db.Delete(model.Inbound{}, id).Error; err != nil {
return needRestart, err
}
if markDirty && ib.NodeID != nil {
if dErr := (&NodeService{}).MarkNodeDirty(*ib.NodeID); dErr != nil {
logger.Warning("mark node dirty failed:", dErr)
}
}
if !database.IsPostgres() {
var count int64
if err := db.Model(&model.Inbound{}).Count(&count).Error; err != nil {
@@ -740,12 +849,9 @@ func (s *InboundService) SetInboundEnable(id int, enable bool) (bool, error) {
inbound.Enable = enable
needRestart := false
rt, rterr := s.runtimeFor(inbound)
if rterr != nil {
if inbound.NodeID != nil {
return false, rterr
}
return true, nil
rt, push, dirty, perr := s.nodePushPlan(inbound)
if perr != nil {
return false, perr
}
// Remote nodes interpret DelInbound as a real row delete (it hits
@@ -754,13 +860,24 @@ func (s *InboundService) SetInboundEnable(id int, enable bool) (bool, error) {
// PATCH the remote row via UpdateInbound instead — preserves the
// settings/client history and just flips the enable flag.
if inbound.NodeID != nil {
if err := rt.UpdateInbound(context.Background(), inbound, inbound); err != nil {
logger.Debug("SetInboundEnable: remote UpdateInbound on", rt.Name(), "failed:", err)
return false, err
if push {
if err := rt.UpdateInbound(context.Background(), inbound, inbound); err != nil {
logger.Warning("SetInboundEnable: remote UpdateInbound on", rt.Name(), "failed:", err)
dirty = true
}
}
if dirty {
if dErr := (&NodeService{}).MarkNodeDirty(*inbound.NodeID); dErr != nil {
logger.Warning("mark node dirty failed:", dErr)
}
}
return false, nil
}
if !push {
return true, nil
}
if err := rt.DelInbound(context.Background(), inbound); err != nil &&
!strings.Contains(err.Error(), "not found") {
logger.Debug("SetInboundEnable: DelInbound on", rt.Name(), "failed:", err)
@@ -807,11 +924,17 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound,
db := database.GetDB()
tx := db.Begin()
markDirty := false
defer func() {
if err != nil {
tx.Rollback()
} else {
tx.Commit()
return
}
tx.Commit()
if markDirty && oldInbound.NodeID != nil {
if dErr := (&NodeService{}).MarkNodeDirty(*oldInbound.NodeID); dErr != nil {
logger.Warning("mark node dirty failed:", dErr)
}
}
}()
@@ -900,17 +1023,20 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound,
inbound.Tag = oldInbound.Tag
needRestart := false
rt, rterr := s.runtimeFor(oldInbound)
if rterr != nil {
if oldInbound.NodeID != nil {
err = rterr
return inbound, false, err
}
needRestart = true
} else {
oldSnapshot := *oldInbound
oldSnapshot.Tag = tag
if oldInbound.NodeID == nil {
rt, push, dirty, perr := s.nodePushPlan(oldInbound)
if perr != nil {
err = perr
return inbound, false, err
}
if dirty {
markDirty = true
}
if oldInbound.NodeID == nil {
if !push {
needRestart = true
} else {
oldSnapshot := *oldInbound
oldSnapshot.Tag = tag
if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 == nil {
logger.Debug("Old inbound deleted on", rt.Name(), ":", tag)
}
@@ -926,16 +1052,18 @@ func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound,
needRestart = true
}
}
} else {
if !inbound.Enable {
if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 != nil {
err = err2
return inbound, false, err
}
} else if err2 := rt.UpdateInbound(context.Background(), &oldSnapshot, oldInbound); err2 != nil {
err = err2
return inbound, false, err
}
} else if push {
oldSnapshot := *oldInbound
oldSnapshot.Tag = tag
if !inbound.Enable {
if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 != nil {
logger.Warning("Unable to disable inbound on", rt.Name(), ":", err2)
markDirty = true
}
} else if err2 := rt.UpdateInbound(context.Background(), &oldSnapshot, oldInbound); err2 != nil {
logger.Warning("Unable to update inbound on", rt.Name(), ":", err2)
markDirty = true
}
}
@@ -1303,17 +1431,17 @@ func (s *InboundService) upsertNodeBaseline(tx *gorm.DB, nodeID int, email strin
}).Create(&model.NodeClientTraffic{NodeId: nodeID, Email: email, Up: up, Down: down}).Error
}
func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnapshot) (bool, error) {
func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnapshot, dirty bool) (bool, error) {
var structuralChange bool
err := submitTrafficWrite(func() error {
var inner error
structuralChange, inner = s.setRemoteTrafficLocked(nodeID, snap)
structuralChange, inner = s.setRemoteTrafficLocked(nodeID, snap, dirty)
return inner
})
return structuralChange, err
}
func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.TrafficSnapshot) (bool, error) {
func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.TrafficSnapshot, dirty bool) (bool, error) {
if snap == nil || nodeID <= 0 {
return false, nil
}
@@ -1425,6 +1553,9 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
c, ok := tagToCentral[snapIb.Tag]
if !ok {
if dirty {
continue
}
// Try snap.Tag first; on collision fall back to the n<id>-
// prefixed form so local+node can both own the same port.
pickFreeTag := func() (string, error) {
@@ -1491,42 +1622,48 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs
updates := map[string]any{
"enable": snapIb.Enable,
"remark": snapIb.Remark,
"listen": snapIb.Listen,
"port": snapIb.Port,
"protocol": snapIb.Protocol,
"total": snapIb.Total,
"expiry_time": snapIb.ExpiryTime,
"settings": snapIb.Settings,
"stream_settings": snapIb.StreamSettings,
"sniffing": snapIb.Sniffing,
"traffic_reset": snapIb.TrafficReset,
updates := map[string]any{}
if !dirty {
updates["enable"] = snapIb.Enable
updates["remark"] = snapIb.Remark
updates["listen"] = snapIb.Listen
updates["port"] = snapIb.Port
updates["protocol"] = snapIb.Protocol
updates["total"] = snapIb.Total
updates["expiry_time"] = snapIb.ExpiryTime
updates["settings"] = snapIb.Settings
updates["stream_settings"] = snapIb.StreamSettings
updates["sniffing"] = snapIb.Sniffing
updates["traffic_reset"] = snapIb.TrafficReset
}
if !inGrace || (snapIb.Up+snapIb.Down) <= (c.Up+c.Down) {
updates["up"] = snapIb.Up
updates["down"] = snapIb.Down
}
if c.Settings != snapIb.Settings ||
if !dirty && (c.Settings != snapIb.Settings ||
c.Remark != snapIb.Remark ||
c.Listen != snapIb.Listen ||
c.Port != snapIb.Port ||
c.Total != snapIb.Total ||
c.ExpiryTime != snapIb.ExpiryTime ||
c.Enable != snapIb.Enable {
c.Enable != snapIb.Enable) {
structuralChange = true
}
if err := tx.Model(model.Inbound{}).
Where("id = ?", c.Id).
Updates(updates).Error; err != nil {
return false, err
if len(updates) > 0 {
if err := tx.Model(model.Inbound{}).
Where("id = ?", c.Id).
Updates(updates).Error; err != nil {
return false, err
}
}
}
for _, c := range central {
if dirty {
continue
}
if _, kept := snapTags[c.Tag]; kept {
continue
}
@@ -1581,6 +1718,9 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
}
if _, rowExists := existingEmails[cs.Email]; !rowExists {
if dirty {
continue
}
row := &xray.ClientTraffic{
InboundId: c.Id,
Email: cs.Email,
@@ -1642,6 +1782,9 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
}
for k, existing := range centralCS {
if dirty {
continue
}
if k.inboundID != c.Id {
continue
}
@@ -1673,6 +1816,9 @@ func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.Traffi
if !ok {
continue
}
if dirty {
continue
}
var oldEmailsRows []string
if err := tx.Table("clients").
Joins("JOIN client_inbounds ON client_inbounds.client_id = clients.id").
@@ -2674,12 +2820,20 @@ func (s *InboundService) resetClientTrafficLocked(id int, clientEmail string) (b
}
for _, client := range clients {
if client.Email == clientEmail && client.Enable {
rt, rterr := s.runtimeFor(inbound)
if rterr != nil {
rt, push, dirty, perr := s.nodePushPlan(inbound)
if perr != nil {
return false, perr
}
if !push {
if inbound.NodeID != nil {
return false, rterr
if dirty {
if dErr := (&NodeService{}).MarkNodeDirty(*inbound.NodeID); dErr != nil {
logger.Warning("mark node dirty failed:", dErr)
}
}
} else {
needRestart = true
}
needRestart = true
break
}
cipher := ""
@@ -2702,6 +2856,11 @@ func (s *InboundService) resetClientTrafficLocked(id int, clientEmail string) (b
})
if err1 == nil {
logger.Debug("Client enabled on", rt.Name(), "due to reset traffic:", clientEmail)
} else if inbound.NodeID != nil {
logger.Warning("Error in enabling client on", rt.Name(), ":", err1)
if dErr := (&NodeService{}).MarkNodeDirty(*inbound.NodeID); dErr != nil {
logger.Warning("mark node dirty failed:", dErr)
}
} else {
logger.Debug("Error in enabling client on", rt.Name(), ":", err1)
needRestart = true

View File

@@ -480,6 +480,50 @@ func (s *NodeService) UpdateHeartbeat(id int, p HeartbeatPatch) error {
return nil
}
func (s *NodeService) MarkNodeDirty(id int) error {
if id <= 0 {
return nil
}
return database.GetDB().Model(model.Node{}).
Where("id = ?", id).
Updates(map[string]any{
"config_dirty": true,
"config_dirty_at": time.Now().UnixMilli(),
}).Error
}
func (s *NodeService) ClearNodeDirty(id int, dirtyAt int64) error {
if id <= 0 {
return nil
}
return database.GetDB().Model(model.Node{}).
Where("id = ? AND config_dirty_at = ?", id, dirtyAt).
Update("config_dirty", false).Error
}
func (s *NodeService) NodeSyncState(id int) (enabled bool, status string, dirty bool, dirtyAt int64, err error) {
if id <= 0 {
return false, "", false, 0, errors.New("invalid node id")
}
var row model.Node
err = database.GetDB().Model(model.Node{}).
Select("enable", "status", "config_dirty", "config_dirty_at").
Where("id = ?", id).
First(&row).Error
if err != nil {
return false, "", false, 0, err
}
return row.Enable, row.Status, row.ConfigDirty, row.ConfigDirtyAt, nil
}
func (s *NodeService) IsNodePending(id int) bool {
enabled, status, dirty, _, err := s.NodeSyncState(id)
if err != nil {
return false
}
return !enabled || status != "online" || dirty
}
func nodeMetricKey(id int, metric string) string {
return "node:" + strconv.Itoa(id) + ":" + metric
}

View File

@@ -36,7 +36,7 @@ func syncNode(t *testing.T, svc *InboundService, nodeID int, tag string, stats .
snap := &runtime.TrafficSnapshot{
Inbounds: []*model.Inbound{{Tag: tag, ClientStats: stats}},
}
if _, err := svc.setRemoteTrafficLocked(nodeID, snap); err != nil {
if _, err := svc.setRemoteTrafficLocked(nodeID, snap, false); err != nil {
t.Fatalf("setRemoteTrafficLocked node %d: %v", nodeID, err)
}
}

View File

@@ -0,0 +1,104 @@
package service
import (
"testing"
"github.com/mhsanaei/3x-ui/v3/database"
"github.com/mhsanaei/3x-ui/v3/database/model"
"github.com/mhsanaei/3x-ui/v3/web/runtime"
)
// While a node is config-dirty (a local edit committed before it could be
// mirrored to the node), the traffic pull must not overwrite the central
// inbound's config columns from the node's stale snapshot — only traffic
// counters may advance. Otherwise a reconnecting node reverts the edit.
func TestSetRemoteTraffic_DirtyPreservesConfig(t *testing.T) {
setupConflictDB(t)
db := database.GetDB()
node := &model.Node{Name: "n1", Address: "127.0.0.1", Port: 2096, ApiToken: "tok", Enable: true, Status: "online"}
if err := db.Create(node).Error; err != nil {
t.Fatalf("create node: %v", err)
}
id := node.Id
const desiredSettings = `{"clients":[{"email":"a@x"}]}`
central := &model.Inbound{
UserId: 1,
NodeID: &id,
Tag: "in-443-tcp",
Enable: true,
Port: 443,
Protocol: model.VLESS,
Settings: desiredSettings,
}
if err := db.Create(central).Error; err != nil {
t.Fatalf("create inbound: %v", err)
}
snap := &runtime.TrafficSnapshot{
Inbounds: []*model.Inbound{{
Tag: "in-443-tcp",
Enable: true,
Port: 443,
Protocol: model.VLESS,
Settings: `{"clients":[{"email":"b@x"}]}`,
Up: 500,
Down: 700,
}},
}
svc := InboundService{}
if _, err := svc.setRemoteTrafficLocked(id, snap, true); err != nil {
t.Fatalf("setRemoteTrafficLocked dirty: %v", err)
}
var got model.Inbound
if err := db.First(&got, central.Id).Error; err != nil {
t.Fatalf("reload inbound: %v", err)
}
if got.Settings != desiredSettings {
t.Fatalf("dirty pull overwrote settings: want %q got %q", desiredSettings, got.Settings)
}
if got.Up != 500 || got.Down != 700 {
t.Fatalf("traffic counters not applied while dirty: up=%d down=%d", got.Up, got.Down)
}
}
// ClearNodeDirty must be a compare-and-swap on config_dirty_at so a concurrent
// edit that re-dirties the node during a reconcile is not silently cleared.
func TestNodeDirty_ClearIsCASOnDirtyAt(t *testing.T) {
setupConflictDB(t)
db := database.GetDB()
node := &model.Node{Name: "n2", Address: "127.0.0.1", Port: 2096, ApiToken: "tok", Enable: true, Status: "online"}
if err := db.Create(node).Error; err != nil {
t.Fatalf("create node: %v", err)
}
nodeSvc := NodeService{}
if err := nodeSvc.MarkNodeDirty(node.Id); err != nil {
t.Fatalf("MarkNodeDirty: %v", err)
}
_, _, dirty, dirtyAt, err := nodeSvc.NodeSyncState(node.Id)
if err != nil {
t.Fatalf("NodeSyncState: %v", err)
}
if !dirty {
t.Fatal("node should be dirty after MarkNodeDirty")
}
if err := nodeSvc.ClearNodeDirty(node.Id, dirtyAt-1); err != nil {
t.Fatalf("ClearNodeDirty stale token: %v", err)
}
if _, _, stillDirty, _, _ := nodeSvc.NodeSyncState(node.Id); !stillDirty {
t.Fatal("stale-token clear must not clear the dirty flag")
}
if err := nodeSvc.ClearNodeDirty(node.Id, dirtyAt); err != nil {
t.Fatalf("ClearNodeDirty matching token: %v", err)
}
if _, _, stillDirty, _, _ := nodeSvc.NodeSyncState(node.Id); stillDirty {
t.Fatal("matching-token clear must clear the dirty flag")
}
}

View File

@@ -46,7 +46,7 @@ func TestSetRemoteTraffic_KeepsInboundOnPrefixMismatch(t *testing.T) {
}
svc := InboundService{}
if _, err := svc.setRemoteTrafficLocked(nodeID, snap); err != nil {
if _, err := svc.setRemoteTrafficLocked(nodeID, snap, false); err != nil {
t.Fatalf("setRemoteTrafficLocked: %v", err)
}

View File

@@ -470,6 +470,7 @@
"inboundClientAddSuccess": "Inbound client(s) have been added.",
"inboundClientDeleteSuccess": "Inbound client has been deleted.",
"inboundClientUpdateSuccess": "Inbound client has been updated.",
"savedNodeOfflineWillSync": "Saved locally. A backing node is offline or disabled — the change will sync once it reconnects.",
"delDepletedClientsSuccess": "All depleted clients have been deleted.",
"resetAllClientTrafficSuccess": "Traffic for all clients has been reset.",
"resetAllTrafficSuccess": "All traffic has been reset.",