From 4f597a08c49884da2d656f997f6ef5f2e4affbd4 Mon Sep 17 00:00:00 2001 From: MHSanaei Date: Tue, 2 Jun 2026 03:59:10 +0200 Subject: [PATCH] perf(clients): batch bulk attach/detach to cut per-item DB work BulkDetach removed one client per (email x inbound) pair, each with its own settings rewrite, transaction and full SyncInbound. Add delInboundClients to remove all targeted clients from an inbound in a single pass and group removals by inbound, turning O(emails x inbounds) write cycles into O(inbounds). BulkAttach ran the global getAllEmailSubIDs scan once per target inbound via checkEmailsExistForClients. Compute that snapshot once per call and thread it through a new internal addInboundClient; the duplicate check is unaffected because attach reuses each client's existing identity (same subId). Covered by bulk_clients_test.go: VLESS round-trip (linkage, settings JSON, idempotency, record survival), skip-unattached, and Trojan key matching. --- web/service/bulk_clients_test.go | 244 +++++++++++++++++++++++++++++++ web/service/client.go | 212 +++++++++++++++++++++++++-- web/service/inbound.go | 2 +- 3 files changed, 443 insertions(+), 15 deletions(-) create mode 100644 web/service/bulk_clients_test.go diff --git a/web/service/bulk_clients_test.go b/web/service/bulk_clients_test.go new file mode 100644 index 00000000..101a813e --- /dev/null +++ b/web/service/bulk_clients_test.go @@ -0,0 +1,244 @@ +package service + +import ( + "encoding/json" + "path/filepath" + "sort" + "testing" + + "github.com/mhsanaei/3x-ui/v3/database" + "github.com/mhsanaei/3x-ui/v3/database/model" +) + +func setupBulkDB(t *testing.T) { + t.Helper() + dbDir := t.TempDir() + t.Setenv("XUI_DB_FOLDER", dbDir) + if err := database.InitDB(filepath.Join(dbDir, "x-ui.db")); err != nil { + t.Fatalf("InitDB: %v", err) + } + t.Cleanup(func() { _ = database.CloseDB() }) +} + +func clientsSettings(t *testing.T, clients []model.Client) string { + t.Helper() + b, err := json.Marshal(map[string][]model.Client{"clients": clients}) + if err != nil { + t.Fatalf("marshal settings: %v", err) + } + return string(b) +} + +func emailsOf(clients []model.Client) []string { + out := make([]string, 0, len(clients)) + for _, c := range clients { + out = append(out, c.Email) + } + return out +} + +func sortedEmails(list []model.Client) []string { + out := emailsOf(list) + sort.Strings(out) + return out +} + +func mkInbound(t *testing.T, port int, proto model.Protocol, settings string) *model.Inbound { + t.Helper() + ib := &model.Inbound{ + Tag: string(proto) + "-" + filepath.Base(t.TempDir()), + Enable: true, + Port: port, + Protocol: proto, + Settings: settings, + } + if err := database.GetDB().Create(ib).Error; err != nil { + t.Fatalf("create inbound %d: %v", port, err) + } + return ib +} + +// TestBulkAttachDetach_VLESS exercises the batched attach/detach round-trip on +// VLESS inbounds: linkage, settings JSON, idempotency, skip, and record survival. +func TestBulkAttachDetach_VLESS(t *testing.T) { + setupBulkDB(t) + svc := &ClientService{} + inboundSvc := &InboundService{} + + source := []model.Client{ + {Email: "alice@x", ID: "11111111-1111-1111-1111-111111111111", SubID: "sa", Enable: true}, + {Email: "bob@x", ID: "22222222-2222-2222-2222-222222222222", SubID: "sb", Enable: true}, + {Email: "carol@x", ID: "33333333-3333-3333-3333-333333333333", SubID: "sc", Enable: true}, + } + + ib1 := mkInbound(t, 20001, model.VLESS, clientsSettings(t, source)) + ib2 := mkInbound(t, 20002, model.VLESS, `{"clients":[]}`) + ib3 := mkInbound(t, 20003, model.VLESS, `{"clients":[]}`) + + if err := svc.SyncInbound(nil, ib1.Id, source); err != nil { + t.Fatalf("seed source linkage: %v", err) + } + + emails := emailsOf(source) + + res, _, err := svc.BulkAttach(inboundSvc, emails, []int{ib2.Id, ib3.Id}) + if err != nil { + t.Fatalf("BulkAttach: %v", err) + } + if len(res.Errors) != 0 { + t.Fatalf("BulkAttach errors: %v", res.Errors) + } + if len(res.Skipped) != 0 { + t.Fatalf("BulkAttach skipped unexpectedly: %v", res.Skipped) + } + if len(res.Attached) != 6 { + t.Fatalf("expected 6 attach entries (3 clients x 2 inbounds), got %d: %v", len(res.Attached), res.Attached) + } + + for _, ib := range []*model.Inbound{ib2, ib3} { + list, err := svc.ListForInbound(nil, ib.Id) + if err != nil { + t.Fatalf("ListForInbound(%d): %v", ib.Id, err) + } + if got := sortedEmails(list); len(got) != 3 { + t.Fatalf("inbound %d: expected 3 linked clients, got %v", ib.Id, got) + } + reloaded, err := inboundSvc.GetInbound(ib.Id) + if err != nil { + t.Fatalf("GetInbound(%d): %v", ib.Id, err) + } + jsonClients, err := inboundSvc.GetClients(reloaded) + if err != nil { + t.Fatalf("GetClients(%d): %v", ib.Id, err) + } + if len(jsonClients) != 3 { + t.Fatalf("inbound %d settings JSON: expected 3 clients, got %d", ib.Id, len(jsonClients)) + } + } + + res2, _, err := svc.BulkAttach(inboundSvc, emails, []int{ib2.Id, ib3.Id}) + if err != nil { + t.Fatalf("BulkAttach (idempotent): %v", err) + } + if len(res2.Attached) != 0 { + t.Fatalf("re-attach should add nothing, got Attached=%v", res2.Attached) + } + if len(res2.Skipped) != 6 { + t.Fatalf("re-attach should skip all 6, got Skipped=%v", res2.Skipped) + } + + dres, _, err := svc.BulkDetach(inboundSvc, emails, []int{ib2.Id, ib3.Id}) + if err != nil { + t.Fatalf("BulkDetach: %v", err) + } + if len(dres.Errors) != 0 { + t.Fatalf("BulkDetach errors: %v", dres.Errors) + } + if len(dres.Detached) != 3 { + t.Fatalf("expected 3 detached emails, got %v", dres.Detached) + } + + for _, ib := range []*model.Inbound{ib2, ib3} { + list, err := svc.ListForInbound(nil, ib.Id) + if err != nil { + t.Fatalf("ListForInbound after detach(%d): %v", ib.Id, err) + } + if len(list) != 0 { + t.Fatalf("inbound %d should have no clients after detach, got %v", ib.Id, sortedEmails(list)) + } + reloaded, _ := inboundSvc.GetInbound(ib.Id) + jsonClients, _ := inboundSvc.GetClients(reloaded) + if len(jsonClients) != 0 { + t.Fatalf("inbound %d settings JSON should be empty after detach, got %d", ib.Id, len(jsonClients)) + } + } + + for _, e := range emails { + rec, err := svc.GetRecordByEmail(nil, e) + if err != nil { + t.Fatalf("record %q should survive detach: %v", e, err) + } + ids, err := svc.GetInboundIdsForRecord(rec.Id) + if err != nil { + t.Fatalf("GetInboundIdsForRecord(%q): %v", e, err) + } + if len(ids) != 1 || ids[0] != ib1.Id { + t.Fatalf("record %q should remain attached only to source inbound %d, got %v", e, ib1.Id, ids) + } + } +} + +// TestBulkDetach_SkipsUnattached verifies emails not on any requested inbound +// land in Skipped, not Detached, and produce no error. +func TestBulkDetach_SkipsUnattached(t *testing.T) { + setupBulkDB(t) + svc := &ClientService{} + inboundSvc := &InboundService{} + + source := []model.Client{ + {Email: "only-on-1@x", ID: "44444444-4444-4444-4444-444444444444", SubID: "s1", Enable: true}, + } + ib1 := mkInbound(t, 21001, model.VLESS, clientsSettings(t, source)) + ib2 := mkInbound(t, 21002, model.VLESS, `{"clients":[]}`) + if err := svc.SyncInbound(nil, ib1.Id, source); err != nil { + t.Fatalf("seed: %v", err) + } + + dres, restart, err := svc.BulkDetach(inboundSvc, []string{"only-on-1@x"}, []int{ib2.Id}) + if err != nil { + t.Fatalf("BulkDetach: %v", err) + } + if restart { + t.Fatalf("no-op detach should not require restart") + } + if len(dres.Detached) != 0 { + t.Fatalf("nothing should be detached, got %v", dres.Detached) + } + if len(dres.Skipped) != 1 || dres.Skipped[0] != "only-on-1@x" { + t.Fatalf("expected the email in Skipped, got %v", dres.Skipped) + } + if len(dres.Errors) != 0 { + t.Fatalf("unexpected errors: %v", dres.Errors) + } +} + +// TestBulkAttachDetach_Trojan checks the protocol-specific key matching in the +// batched detach path (Trojan keys on password, not id). +func TestBulkAttachDetach_Trojan(t *testing.T) { + setupBulkDB(t) + svc := &ClientService{} + inboundSvc := &InboundService{} + + source := []model.Client{ + {Email: "t1@x", Password: "pw-t1", SubID: "t1", Enable: true}, + {Email: "t2@x", Password: "pw-t2", SubID: "t2", Enable: true}, + } + ib1 := mkInbound(t, 22001, model.Trojan, clientsSettings(t, source)) + ib2 := mkInbound(t, 22002, model.Trojan, `{"clients":[]}`) + if err := svc.SyncInbound(nil, ib1.Id, source); err != nil { + t.Fatalf("seed: %v", err) + } + + emails := emailsOf(source) + if res, _, err := svc.BulkAttach(inboundSvc, emails, []int{ib2.Id}); err != nil { + t.Fatalf("BulkAttach: %v", err) + } else if len(res.Errors) != 0 || len(res.Attached) != 2 { + t.Fatalf("attach result unexpected: attached=%v errors=%v", res.Attached, res.Errors) + } + + list, _ := svc.ListForInbound(nil, ib2.Id) + if len(list) != 2 { + t.Fatalf("expected 2 trojan clients on ib2, got %v", sortedEmails(list)) + } + + dres, _, err := svc.BulkDetach(inboundSvc, emails, []int{ib2.Id}) + if err != nil { + t.Fatalf("BulkDetach: %v", err) + } + if len(dres.Detached) != 2 || len(dres.Errors) != 0 { + t.Fatalf("detach result unexpected: detached=%v errors=%v", dres.Detached, dres.Errors) + } + if list, _ := svc.ListForInbound(nil, ib2.Id); len(list) != 0 { + t.Fatalf("trojan clients should be gone from ib2, got %v", sortedEmails(list)) + } +} diff --git a/web/service/client.go b/web/service/client.go index 56588b44..9d35a552 100644 --- a/web/service/client.go +++ b/web/service/client.go @@ -883,6 +883,12 @@ func (s *ClientService) BulkAttach(inboundSvc *InboundService, emails []string, records = append(records, rec) } + emailSubIDs, sidErr := inboundSvc.getAllEmailSubIDs() + if sidErr != nil { + emailSubIDs = nil + logger.Warningf("[BulkAttach] getAllEmailSubIDs: %v", sidErr) + } + needRestart := false for _, ibId := range inboundIds { inbound, err := inboundSvc.GetInbound(ibId) @@ -924,7 +930,7 @@ func (s *ClientService) BulkAttach(inboundSvc *InboundService, emails []string, recordErr("inbound %d: %v", ibId, err) continue } - nr, err := s.AddInboundClient(inboundSvc, &model.Inbound{Id: ibId, Settings: string(payload)}) + nr, err := s.addInboundClient(inboundSvc, &model.Inbound{Id: ibId, Settings: string(payload)}, emailSubIDs) if err != nil { recordErr("inbound %d: %v", ibId, err) continue @@ -969,7 +975,10 @@ func (s *ClientService) BulkDetach(inboundSvc *InboundService, emails []string, requested[id] = struct{}{} } - needRestart := false + recsByInbound := make(map[int][]*model.ClientRecord) + emailOrder := make([]string, 0, len(emails)) + emailRepr := make(map[string]string, len(emails)) + emailFailed := make(map[string]bool, len(emails)) seenEmail := make(map[string]struct{}, len(emails)) for _, email := range emails { if email == "" { @@ -991,30 +1000,194 @@ func (s *ClientService) BulkDetach(inboundSvc *InboundService, emails []string, recordErr("%s: %v", email, err) continue } - intersection := make([]int, 0, len(currentIds)) + matched := false for _, id := range currentIds { if _, ok := requested[id]; ok { - intersection = append(intersection, id) + recsByInbound[id] = append(recsByInbound[id], rec) + matched = true } } - if len(intersection) == 0 { + if !matched { result.Skipped = append(result.Skipped, rec.Email) continue } - nr, err := s.Detach(inboundSvc, rec.Id, intersection) + emailOrder = append(emailOrder, key) + emailRepr[key] = rec.Email + } + + needRestart := false + for _, ibId := range inboundIds { + recs, ok := recsByInbound[ibId] + if !ok { + continue + } + delete(recsByInbound, ibId) + nr, err := s.delInboundClients(inboundSvc, ibId, recs, true) if err != nil { - recordErr("%s: %v", rec.Email, err) + recordErr("inbound %d: %v", ibId, err) + for _, rec := range recs { + emailFailed[strings.ToLower(rec.Email)] = true + } continue } if nr { needRestart = true } - result.Detached = append(result.Detached, rec.Email) + } + + for _, key := range emailOrder { + if emailFailed[key] { + continue + } + result.Detached = append(result.Detached, emailRepr[key]) } return result, needRestart, nil } +// delInboundClients removes several clients from a single inbound in one pass: +// one settings rewrite, one runtime sweep, one Save and one SyncInbound for the +// whole batch, instead of repeating the full per-client cycle. It mirrors the +// semantics of DelInboundClient for each removed client. needRestart is the OR +// across all removals. +func (s *ClientService) delInboundClients(inboundSvc *InboundService, inboundId int, recs []*model.ClientRecord, keepTraffic bool) (bool, error) { + if len(recs) == 0 { + return false, nil + } + defer lockInbound(inboundId).Unlock() + + oldInbound, err := inboundSvc.GetInbound(inboundId) + if err != nil { + logger.Error("Load Old Data Error") + return false, err + } + + var settings map[string]any + if err := json.Unmarshal([]byte(oldInbound.Settings), &settings); err != nil { + return false, err + } + + clientKey := "id" + switch oldInbound.Protocol { + case "trojan": + clientKey = "password" + case "shadowsocks": + clientKey = "email" + case "hysteria": + clientKey = "auth" + } + + wanted := make(map[string]struct{}, len(recs)) + for _, rec := range recs { + if k := clientKeyForProtocol(oldInbound.Protocol, rec); k != "" { + wanted[k] = struct{}{} + } + } + + interfaceClients, ok := settings["clients"].([]any) + if !ok { + return false, common.NewError("invalid clients format in inbound settings") + } + + type removedClient struct { + email string + needApiDel bool + } + removed := make([]removedClient, 0, len(wanted)) + newClients := make([]any, 0, len(interfaceClients)) + for _, client := range interfaceClients { + c, ok := client.(map[string]any) + if !ok { + newClients = append(newClients, client) + continue + } + cid, _ := c[clientKey].(string) + if _, hit := wanted[cid]; hit && cid != "" { + email, _ := c["email"].(string) + enable, _ := c["enable"].(bool) + removed = append(removed, removedClient{email: email, needApiDel: enable}) + continue + } + newClients = append(newClients, client) + } + + if len(removed) == 0 { + return false, nil + } + + db := database.GetDB() + newClients = compactOrphans(db, newClients) + if newClients == nil { + newClients = []any{} + } + settings["clients"] = newClients + newSettings, err := json.MarshalIndent(settings, "", " ") + if err != nil { + return false, err + } + oldInbound.Settings = string(newSettings) + + needRestart := false + for _, r := range removed { + email := r.email + emailShared, err := inboundSvc.emailUsedByOtherInbounds(email, inboundId) + if err != nil { + return needRestart, err + } + if !emailShared && !keepTraffic { + if err := inboundSvc.DelClientIPs(db, email); err != nil { + logger.Error("Error in delete client IPs") + return needRestart, err + } + } + if len(email) > 0 { + var enables []bool + if err := db.Model(xray.ClientTraffic{}).Where("email = ?", email).Limit(1).Pluck("enable", &enables).Error; err != nil { + logger.Error("Get stats error") + return needRestart, err + } + notDepleted := len(enables) > 0 && enables[0] + if !emailShared && !keepTraffic { + if err := inboundSvc.DelClientStat(db, email); err != nil { + logger.Error("Delete stats Data Error") + return needRestart, err + } + } + if r.needApiDel && notDepleted && oldInbound.NodeID == nil { + rt, rterr := inboundSvc.runtimeFor(oldInbound) + if rterr != nil { + needRestart = true + } else if err1 := rt.RemoveUser(context.Background(), oldInbound, email); err1 != nil { + if !strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", email)) { + needRestart = true + } + } + } + } + if oldInbound.NodeID != nil && len(email) > 0 { + rt, rterr := inboundSvc.runtimeFor(oldInbound) + if rterr != nil { + return needRestart, rterr + } + if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil { + return needRestart, err1 + } + } + } + + if err := db.Save(oldInbound).Error; err != nil { + return needRestart, err + } + finalClients, gcErr := inboundSvc.GetClients(oldInbound) + if gcErr != nil { + return needRestart, gcErr + } + if err := s.SyncInbound(db, inboundId, finalClients); err != nil { + return needRestart, err + } + return needRestart, nil +} + func (s *ClientService) DetachByEmailMany(inboundSvc *InboundService, email string, inboundIds []int) (bool, error) { if email == "" { return false, common.NewError("client email is required") @@ -2881,10 +3054,13 @@ func (s *ClientService) Detach(inboundSvc *InboundService, id int, inboundIds [] return needRestart, nil } -func (s *ClientService) checkEmailsExistForClients(inboundSvc *InboundService, clients []model.Client) (string, error) { - emailSubIDs, err := inboundSvc.getAllEmailSubIDs() - if err != nil { - return "", err +func (s *ClientService) checkEmailsExistForClients(inboundSvc *InboundService, clients []model.Client, emailSubIDs map[string]string) (string, error) { + if emailSubIDs == nil { + var err error + emailSubIDs, err = inboundSvc.getAllEmailSubIDs() + if err != nil { + return "", err + } } seen := make(map[string]string, len(clients)) for _, client := range clients { @@ -2909,6 +3085,14 @@ func (s *ClientService) checkEmailsExistForClients(inboundSvc *InboundService, c } func (s *ClientService) AddInboundClient(inboundSvc *InboundService, data *model.Inbound) (bool, error) { + return s.addInboundClient(inboundSvc, data, nil) +} + +// addInboundClient is AddInboundClient with an optional precomputed email→subId +// map. Bulk callers pass a single snapshot so the global getAllEmailSubIDs scan +// runs once for the whole batch instead of once per target inbound; a nil map +// makes it compute its own (the single-add path). +func (s *ClientService) addInboundClient(inboundSvc *InboundService, data *model.Inbound, emailSubIDs map[string]string) (bool, error) { defer lockInbound(data.Id).Unlock() clients, err := inboundSvc.GetClients(data) @@ -2937,7 +3121,7 @@ func (s *ClientService) AddInboundClient(inboundSvc *InboundService, data *model interfaceClients[i] = cm } } - existEmail, err := s.checkEmailsExistForClients(inboundSvc, clients) + existEmail, err := s.checkEmailsExistForClients(inboundSvc, clients, emailSubIDs) if err != nil { return false, err } @@ -3156,7 +3340,7 @@ func (s *ClientService) UpdateInboundClient(inboundSvc *InboundService, data *mo } if clients[0].Email != oldEmail { - existEmail, err := s.checkEmailsExistForClients(inboundSvc, clients) + existEmail, err := s.checkEmailsExistForClients(inboundSvc, clients, nil) if err != nil { return false, err } diff --git a/web/service/inbound.go b/web/service/inbound.go index 5aebf3b1..f8a0e918 100644 --- a/web/service/inbound.go +++ b/web/service/inbound.go @@ -479,7 +479,7 @@ func (s *InboundService) AddInbound(inbound *model.Inbound) (*model.Inbound, boo if err != nil { return inbound, false, err } - existEmail, err := s.clientService.checkEmailsExistForClients(s, clients) + existEmail, err := s.clientService.checkEmailsExistForClients(s, clients, nil) if err != nil { return inbound, false, err }