perf(clients): batch BulkAdjust per inbound, skip no-op xray calls on local

Same per-inbound batching strategy as BulkDelete. The previous code
called Update once per email, which itself looped through each inbound
the client belonged to — reparsing the same settings JSON, calling
RemoveUser+AddUser on xray, and running SyncInbound for every single
email. For 200 emails in one inbound that's 200 JSON read/write cycles
and 400 xray runtime calls.

The new BulkAdjust groups emails by inbound and per inbound:

- locks once, reads settings JSON once
- mutates expiryTime/totalGB in place for every target client
- writes the inbound and runs SyncInbound once

ClientTraffic rows are updated with a single per-email query at the end
(values differ per client so they can't be folded into one statement).

For local-node inbounds the xray runtime calls are skipped entirely.
The AddUser payload only contains email/id/security/flow/auth/password/
cipher — none of which change in an adjust — so RemoveUser+AddUser was
a no-op that briefly flapped active users. Limit enforcement is driven
by the panel's traffic loop reading ClientTraffic, not by xray-core.

For remote-node inbounds rt.UpdateUser is preserved so the remote panel
receives the new totals/expiry.

Skip+report semantics match BulkDelete: any per-email error leaves that
email's record/traffic untouched and is returned in Skipped[].
This commit is contained in:
MHSanaei
2026-05-27 00:30:40 +02:00
parent e0e6200e2f
commit 15787dbdfe

View File

@@ -1185,86 +1185,341 @@ type BulkAdjustReport struct {
Reason string `json:"reason"`
}
type bulkAdjustEntry struct {
record *model.ClientRecord
applyExpiry bool
newExpiry int64
applyTotal bool
newTotal int64
}
// BulkAdjust shifts ExpiryTime by addDays (days) and TotalGB by addBytes
// for every email in the list. Clients whose corresponding field is
// unlimited (0) are skipped — bulk extend should not accidentally
// limit an unlimited client. addDays and addBytes may be negative.
//
// Like BulkDelete, the work is grouped by inbound so each inbound's
// settings JSON is parsed and written exactly once regardless of how
// many target emails it contains.
func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string, addDays int, addBytes int64) (BulkAdjustResult, bool, error) {
result := BulkAdjustResult{}
needRestart := false
if len(emails) == 0 {
return result, needRestart, nil
return result, false, nil
}
if addDays == 0 && addBytes == 0 {
return result, needRestart, common.NewError("no adjustment specified")
return result, false, common.NewError("no adjustment specified")
}
addExpiryMs := int64(addDays) * 24 * 60 * 60 * 1000
for _, email := range emails {
email = strings.TrimSpace(email)
if email == "" {
seen := map[string]struct{}{}
cleanEmails := make([]string, 0, len(emails))
for _, e := range emails {
e = strings.TrimSpace(e)
if e == "" {
continue
}
rec, err := s.GetRecordByEmail(nil, email)
if err != nil {
result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: err.Error()})
if _, ok := seen[e]; ok {
continue
}
client := rec.ToClient()
seen[e] = struct{}{}
cleanEmails = append(cleanEmails, e)
}
if len(cleanEmails) == 0 {
return result, false, nil
}
applied := false
db := database.GetDB()
var records []model.ClientRecord
if err := db.Where("email IN ?", cleanEmails).Find(&records).Error; err != nil {
return result, false, err
}
recordsByEmail := make(map[string]*model.ClientRecord, len(records))
for i := range records {
recordsByEmail[records[i].Email] = &records[i]
}
skippedReasons := map[string]string{}
for _, email := range cleanEmails {
if _, ok := recordsByEmail[email]; !ok {
skippedReasons[email] = "client not found"
}
}
plan := map[string]*bulkAdjustEntry{}
for email, rec := range recordsByEmail {
entry := &bulkAdjustEntry{record: rec}
if addDays != 0 {
switch {
case rec.ExpiryTime == 0:
result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: "unlimited expiry"})
if _, exists := skippedReasons[email]; !exists {
skippedReasons[email] = "unlimited expiry"
}
case rec.ExpiryTime > 0:
next := rec.ExpiryTime + addExpiryMs
if next <= 0 {
result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: "reduction exceeds remaining time"})
if _, exists := skippedReasons[email]; !exists {
skippedReasons[email] = "reduction exceeds remaining time"
}
} else {
client.ExpiryTime = next
applied = true
entry.applyExpiry = true
entry.newExpiry = next
}
default:
next := rec.ExpiryTime - addExpiryMs
if next >= 0 {
result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: "reduction exceeds delay window"})
if _, exists := skippedReasons[email]; !exists {
skippedReasons[email] = "reduction exceeds delay window"
}
} else {
client.ExpiryTime = next
applied = true
entry.applyExpiry = true
entry.newExpiry = next
}
}
}
if addBytes != 0 {
if rec.TotalGB == 0 {
result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: "unlimited traffic"})
if _, exists := skippedReasons[email]; !exists {
skippedReasons[email] = "unlimited traffic"
}
} else {
next := rec.TotalGB + addBytes
if next < 0 {
next = 0
}
client.TotalGB = next
applied = true
entry.applyTotal = true
entry.newTotal = next
}
}
if !applied {
continue
if entry.applyExpiry || entry.applyTotal {
plan[email] = entry
}
}
nr, err := s.Update(inboundSvc, rec.Id, *client)
if err != nil {
result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: err.Error()})
if len(plan) == 0 {
for email, reason := range skippedReasons {
result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: reason})
}
return result, false, nil
}
plannedIds := make([]int, 0, len(plan))
recordIdToEmail := make(map[int]string, len(plan))
for email, entry := range plan {
plannedIds = append(plannedIds, entry.record.Id)
recordIdToEmail[entry.record.Id] = email
}
var mappings []model.ClientInbound
if err := db.Where("client_id IN ?", plannedIds).Find(&mappings).Error; err != nil {
return result, false, err
}
emailsByInbound := map[int][]string{}
for _, m := range mappings {
email, ok := recordIdToEmail[m.ClientId]
if !ok {
continue
}
if nr {
emailsByInbound[m.InboundId] = append(emailsByInbound[m.InboundId], email)
}
needRestart := false
for inboundId, ibEmails := range emailsByInbound {
ibRes := s.bulkAdjustInboundClients(inboundSvc, inboundId, ibEmails, plan)
if ibRes.needRestart {
needRestart = true
}
for email, reason := range ibRes.perEmailSkipped {
if _, already := skippedReasons[email]; !already {
skippedReasons[email] = reason
}
}
}
for email, entry := range plan {
if _, skipped := skippedReasons[email]; skipped {
continue
}
updates := map[string]any{}
if entry.applyExpiry {
updates["expiry_time"] = entry.newExpiry
}
if entry.applyTotal {
updates["total"] = entry.newTotal
}
if len(updates) == 0 {
continue
}
if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Updates(updates).Error; err != nil {
if _, already := skippedReasons[email]; !already {
skippedReasons[email] = err.Error()
}
continue
}
result.Adjusted++
}
for email, reason := range skippedReasons {
result.Skipped = append(result.Skipped, BulkAdjustReport{Email: email, Reason: reason})
}
return result, needRestart, nil
}
type bulkInboundAdjustResult struct {
perEmailSkipped map[string]string
needRestart bool
}
// bulkAdjustInboundClients applies expiry/total deltas to multiple clients
// inside a single inbound's settings JSON. The xray runtime is updated
// only for remote-node inbounds; local nodes do not need a notification
// because the AddUser payload does not include totalGB/expiryTime —
// changing those fields is identity-preserving and the panel's traffic
// enforcement loop picks up the new limits from ClientTraffic directly.
func (s *ClientService) bulkAdjustInboundClients(
inboundSvc *InboundService,
inboundId int,
emails []string,
plan map[string]*bulkAdjustEntry,
) bulkInboundAdjustResult {
res := bulkInboundAdjustResult{perEmailSkipped: map[string]string{}}
defer lockInbound(inboundId).Unlock()
oldInbound, err := inboundSvc.GetInbound(inboundId)
if err != nil {
logger.Error("Load Old Data Error")
for _, e := range emails {
res.perEmailSkipped[e] = err.Error()
}
return res
}
var settings map[string]any
if err := json.Unmarshal([]byte(oldInbound.Settings), &settings); err != nil {
for _, e := range emails {
res.perEmailSkipped[e] = err.Error()
}
return res
}
clientKey := "id"
switch oldInbound.Protocol {
case model.Trojan:
clientKey = "password"
case model.Shadowsocks:
clientKey = "email"
case model.Hysteria, model.Hysteria2:
clientKey = "auth"
}
keyToEmail := make(map[string]string, len(emails))
for _, email := range emails {
entry := plan[email]
if entry == nil {
res.perEmailSkipped[email] = "client not found"
continue
}
key := clientKeyForProtocol(oldInbound.Protocol, entry.record)
if key == "" {
res.perEmailSkipped[email] = "missing client key for protocol"
continue
}
keyToEmail[key] = email
}
interfaceClients, _ := settings["clients"].([]any)
foundEmails := map[string]bool{}
nowMs := time.Now().Unix() * 1000
for i, client := range interfaceClients {
c, ok := client.(map[string]any)
if !ok {
continue
}
cKey, _ := c[clientKey].(string)
targetEmail, found := keyToEmail[cKey]
if !found {
continue
}
entry := plan[targetEmail]
if entry.applyExpiry {
c["expiryTime"] = entry.newExpiry
}
if entry.applyTotal {
c["totalGB"] = entry.newTotal
}
c["updated_at"] = nowMs
interfaceClients[i] = c
foundEmails[targetEmail] = true
}
for _, email := range keyToEmail {
if !foundEmails[email] {
res.perEmailSkipped[email] = "Client Not Found In Inbound"
}
}
if len(foundEmails) == 0 {
return res
}
settings["clients"] = interfaceClients
newSettings, err := json.MarshalIndent(settings, "", " ")
if err != nil {
for email := range foundEmails {
res.perEmailSkipped[email] = err.Error()
}
return res
}
oldInbound.Settings = string(newSettings)
if oldInbound.NodeID != nil {
rt, rterr := inboundSvc.runtimeFor(oldInbound)
if rterr != nil {
for email := range foundEmails {
res.perEmailSkipped[email] = rterr.Error()
delete(foundEmails, email)
}
} else {
for email := range foundEmails {
entry := plan[email]
updated := *entry.record.ToClient()
if entry.applyExpiry {
updated.ExpiryTime = entry.newExpiry
}
if entry.applyTotal {
updated.TotalGB = entry.newTotal
}
updated.UpdatedAt = nowMs
if err1 := rt.UpdateUser(context.Background(), oldInbound, email, updated); err1 != nil {
res.perEmailSkipped[email] = err1.Error()
delete(foundEmails, email)
}
}
}
}
db := database.GetDB()
if err := db.Save(oldInbound).Error; err != nil {
for email := range foundEmails {
if _, skip := res.perEmailSkipped[email]; !skip {
res.perEmailSkipped[email] = err.Error()
}
}
return res
}
finalClients, gcErr := inboundSvc.GetClients(oldInbound)
if gcErr == nil {
if syncErr := s.SyncInbound(db, inboundId, finalClients); syncErr != nil {
logger.Warning("bulkAdjust SyncInbound:", syncErr)
}
}
return res
}
// BulkDeleteResult mirrors BulkAdjustResult: total deleted plus per-email
// skip reasons when an email could not be processed.
type BulkDeleteResult struct {