From 15787dbdfeb9a545eae35fb3d6c1c925f06b283b Mon Sep 17 00:00:00 2001 From: MHSanaei Date: Wed, 27 May 2026 00:30:40 +0200 Subject: [PATCH] perf(clients): batch BulkAdjust per inbound, skip no-op xray calls on local MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Same per-inbound batching strategy as BulkDelete. The previous code called Update once per email, which itself looped through each inbound the client belonged to — reparsing the same settings JSON, calling RemoveUser+AddUser on xray, and running SyncInbound for every single email. For 200 emails in one inbound that's 200 JSON read/write cycles and 400 xray runtime calls. The new BulkAdjust groups emails by inbound and per inbound: - locks once, reads settings JSON once - mutates expiryTime/totalGB in place for every target client - writes the inbound and runs SyncInbound once ClientTraffic rows are updated with a single per-email query at the end (values differ per client so they can't be folded into one statement). For local-node inbounds the xray runtime calls are skipped entirely. The AddUser payload only contains email/id/security/flow/auth/password/ cipher — none of which change in an adjust — so RemoveUser+AddUser was a no-op that briefly flapped active users. Limit enforcement is driven by the panel's traffic loop reading ClientTraffic, not by xray-core. For remote-node inbounds rt.UpdateUser is preserved so the remote panel receives the new totals/expiry. Skip+report semantics match BulkDelete: any per-email error leaves that email's record/traffic untouched and is returned in Skipped[]. --- web/service/client.go | 309 ++++++++++++++++++++++++++++++++++++++---- 1 file changed, 282 insertions(+), 27 deletions(-) diff --git a/web/service/client.go b/web/service/client.go index 93c207e8..5dfcb6d7 100644 --- a/web/service/client.go +++ b/web/service/client.go @@ -1185,86 +1185,341 @@ type BulkAdjustReport struct { Reason string `json:"reason"` } +type bulkAdjustEntry struct { + record *model.ClientRecord + applyExpiry bool + newExpiry int64 + applyTotal bool + newTotal int64 +} + // BulkAdjust shifts ExpiryTime by addDays (days) and TotalGB by addBytes // for every email in the list. Clients whose corresponding field is // unlimited (0) are skipped — bulk extend should not accidentally // limit an unlimited client. addDays and addBytes may be negative. +// +// Like BulkDelete, the work is grouped by inbound so each inbound's +// settings JSON is parsed and written exactly once regardless of how +// many target emails it contains. func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string, addDays int, addBytes int64) (BulkAdjustResult, bool, error) { result := BulkAdjustResult{} - needRestart := false if len(emails) == 0 { - return result, needRestart, nil + return result, false, nil } if addDays == 0 && addBytes == 0 { - return result, needRestart, common.NewError("no adjustment specified") + return result, false, common.NewError("no adjustment specified") } addExpiryMs := int64(addDays) * 24 * 60 * 60 * 1000 - for _, email := range emails { - email = strings.TrimSpace(email) - if email == "" { + seen := map[string]struct{}{} + cleanEmails := make([]string, 0, len(emails)) + for _, e := range emails { + e = strings.TrimSpace(e) + if e == "" { continue } - rec, err := s.GetRecordByEmail(nil, email) - if err != nil { - result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: err.Error()}) + if _, ok := seen[e]; ok { continue } - client := rec.ToClient() + seen[e] = struct{}{} + cleanEmails = append(cleanEmails, e) + } + if len(cleanEmails) == 0 { + return result, false, nil + } - applied := false + db := database.GetDB() + + var records []model.ClientRecord + if err := db.Where("email IN ?", cleanEmails).Find(&records).Error; err != nil { + return result, false, err + } + recordsByEmail := make(map[string]*model.ClientRecord, len(records)) + for i := range records { + recordsByEmail[records[i].Email] = &records[i] + } + + skippedReasons := map[string]string{} + for _, email := range cleanEmails { + if _, ok := recordsByEmail[email]; !ok { + skippedReasons[email] = "client not found" + } + } + + plan := map[string]*bulkAdjustEntry{} + for email, rec := range recordsByEmail { + entry := &bulkAdjustEntry{record: rec} if addDays != 0 { switch { case rec.ExpiryTime == 0: - result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: "unlimited expiry"}) + if _, exists := skippedReasons[email]; !exists { + skippedReasons[email] = "unlimited expiry" + } case rec.ExpiryTime > 0: next := rec.ExpiryTime + addExpiryMs if next <= 0 { - result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: "reduction exceeds remaining time"}) + if _, exists := skippedReasons[email]; !exists { + skippedReasons[email] = "reduction exceeds remaining time" + } } else { - client.ExpiryTime = next - applied = true + entry.applyExpiry = true + entry.newExpiry = next } default: next := rec.ExpiryTime - addExpiryMs if next >= 0 { - result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: "reduction exceeds delay window"}) + if _, exists := skippedReasons[email]; !exists { + skippedReasons[email] = "reduction exceeds delay window" + } } else { - client.ExpiryTime = next - applied = true + entry.applyExpiry = true + entry.newExpiry = next } } } if addBytes != 0 { if rec.TotalGB == 0 { - result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: "unlimited traffic"}) + if _, exists := skippedReasons[email]; !exists { + skippedReasons[email] = "unlimited traffic" + } } else { next := rec.TotalGB + addBytes if next < 0 { next = 0 } - client.TotalGB = next - applied = true + entry.applyTotal = true + entry.newTotal = next } } - if !applied { - continue + if entry.applyExpiry || entry.applyTotal { + plan[email] = entry } + } - nr, err := s.Update(inboundSvc, rec.Id, *client) - if err != nil { - result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: err.Error()}) + if len(plan) == 0 { + for email, reason := range skippedReasons { + result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: reason}) + } + return result, false, nil + } + + plannedIds := make([]int, 0, len(plan)) + recordIdToEmail := make(map[int]string, len(plan)) + for email, entry := range plan { + plannedIds = append(plannedIds, entry.record.Id) + recordIdToEmail[entry.record.Id] = email + } + + var mappings []model.ClientInbound + if err := db.Where("client_id IN ?", plannedIds).Find(&mappings).Error; err != nil { + return result, false, err + } + emailsByInbound := map[int][]string{} + for _, m := range mappings { + email, ok := recordIdToEmail[m.ClientId] + if !ok { continue } - if nr { + emailsByInbound[m.InboundId] = append(emailsByInbound[m.InboundId], email) + } + + needRestart := false + for inboundId, ibEmails := range emailsByInbound { + ibRes := s.bulkAdjustInboundClients(inboundSvc, inboundId, ibEmails, plan) + if ibRes.needRestart { needRestart = true } + for email, reason := range ibRes.perEmailSkipped { + if _, already := skippedReasons[email]; !already { + skippedReasons[email] = reason + } + } + } + + for email, entry := range plan { + if _, skipped := skippedReasons[email]; skipped { + continue + } + updates := map[string]any{} + if entry.applyExpiry { + updates["expiry_time"] = entry.newExpiry + } + if entry.applyTotal { + updates["total"] = entry.newTotal + } + if len(updates) == 0 { + continue + } + if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Updates(updates).Error; err != nil { + if _, already := skippedReasons[email]; !already { + skippedReasons[email] = err.Error() + } + continue + } result.Adjusted++ } + + for email, reason := range skippedReasons { + result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: reason}) + } return result, needRestart, nil } +type bulkInboundAdjustResult struct { + perEmailSkipped map[string]string + needRestart bool +} + +// bulkAdjustInboundClients applies expiry/total deltas to multiple clients +// inside a single inbound's settings JSON. The xray runtime is updated +// only for remote-node inbounds; local nodes do not need a notification +// because the AddUser payload does not include totalGB/expiryTime — +// changing those fields is identity-preserving and the panel's traffic +// enforcement loop picks up the new limits from ClientTraffic directly. +func (s *ClientService) bulkAdjustInboundClients( + inboundSvc *InboundService, + inboundId int, + emails []string, + plan map[string]*bulkAdjustEntry, +) bulkInboundAdjustResult { + res := bulkInboundAdjustResult{perEmailSkipped: map[string]string{}} + + defer lockInbound(inboundId).Unlock() + + oldInbound, err := inboundSvc.GetInbound(inboundId) + if err != nil { + logger.Error("Load Old Data Error") + for _, e := range emails { + res.perEmailSkipped[e] = err.Error() + } + return res + } + + var settings map[string]any + if err := json.Unmarshal([]byte(oldInbound.Settings), &settings); err != nil { + for _, e := range emails { + res.perEmailSkipped[e] = err.Error() + } + return res + } + + clientKey := "id" + switch oldInbound.Protocol { + case model.Trojan: + clientKey = "password" + case model.Shadowsocks: + clientKey = "email" + case model.Hysteria, model.Hysteria2: + clientKey = "auth" + } + + keyToEmail := make(map[string]string, len(emails)) + for _, email := range emails { + entry := plan[email] + if entry == nil { + res.perEmailSkipped[email] = "client not found" + continue + } + key := clientKeyForProtocol(oldInbound.Protocol, entry.record) + if key == "" { + res.perEmailSkipped[email] = "missing client key for protocol" + continue + } + keyToEmail[key] = email + } + + interfaceClients, _ := settings["clients"].([]any) + foundEmails := map[string]bool{} + nowMs := time.Now().Unix() * 1000 + for i, client := range interfaceClients { + c, ok := client.(map[string]any) + if !ok { + continue + } + cKey, _ := c[clientKey].(string) + targetEmail, found := keyToEmail[cKey] + if !found { + continue + } + entry := plan[targetEmail] + if entry.applyExpiry { + c["expiryTime"] = entry.newExpiry + } + if entry.applyTotal { + c["totalGB"] = entry.newTotal + } + c["updated_at"] = nowMs + interfaceClients[i] = c + foundEmails[targetEmail] = true + } + + for _, email := range keyToEmail { + if !foundEmails[email] { + res.perEmailSkipped[email] = "Client Not Found In Inbound" + } + } + + if len(foundEmails) == 0 { + return res + } + + settings["clients"] = interfaceClients + newSettings, err := json.MarshalIndent(settings, "", " ") + if err != nil { + for email := range foundEmails { + res.perEmailSkipped[email] = err.Error() + } + return res + } + oldInbound.Settings = string(newSettings) + + if oldInbound.NodeID != nil { + rt, rterr := inboundSvc.runtimeFor(oldInbound) + if rterr != nil { + for email := range foundEmails { + res.perEmailSkipped[email] = rterr.Error() + delete(foundEmails, email) + } + } else { + for email := range foundEmails { + entry := plan[email] + updated := *entry.record.ToClient() + if entry.applyExpiry { + updated.ExpiryTime = entry.newExpiry + } + if entry.applyTotal { + updated.TotalGB = entry.newTotal + } + updated.UpdatedAt = nowMs + if err1 := rt.UpdateUser(context.Background(), oldInbound, email, updated); err1 != nil { + res.perEmailSkipped[email] = err1.Error() + delete(foundEmails, email) + } + } + } + } + + db := database.GetDB() + if err := db.Save(oldInbound).Error; err != nil { + for email := range foundEmails { + if _, skip := res.perEmailSkipped[email]; !skip { + res.perEmailSkipped[email] = err.Error() + } + } + return res + } + + finalClients, gcErr := inboundSvc.GetClients(oldInbound) + if gcErr == nil { + if syncErr := s.SyncInbound(db, inboundId, finalClients); syncErr != nil { + logger.Warning("bulkAdjust SyncInbound:", syncErr) + } + } + + return res +} + // BulkDeleteResult mirrors BulkAdjustResult: total deleted plus per-email // skip reasons when an email could not be processed. type BulkDeleteResult struct {