feat(clients): server-side bulk create/delete with per-inbound batching

Replace the panel-side fan-out (Promise.all of single /add and /del
calls) that raced on the shared inbound config and capped throughput at
roughly one round-trip per client. New endpoints batch the work on the
server:

- POST /panel/api/clients/bulkDel  { emails, keepTraffic }
- POST /panel/api/clients/bulkCreate  [ {client, inboundIds}, ... ]

BulkDelete groups emails by inbound and performs a single
read-modify-write per inbound (one JSON parse, one marshal, one Save)
instead of N. Per-row DB cleanups (ClientInbound, ClientTraffic,
InboundClientIps, ClientRecord) are batched with WHERE...IN queries.
Per-email failures are reported via Skipped[] and processing continues.

BulkCreate iterates payloads sequentially through the same Create path
single-add uses, so heterogeneous batches (different inboundIds, plans)
remain valid in one round-trip.

Frontend bulkDelete/bulkCreate hooks parse the new response shape
({ deleted|created, skipped[] }) and the bulk-add modal now posts a
single request instead of fanning out emails.
This commit is contained in:
MHSanaei
2026-05-27 00:20:52 +02:00
parent 989333b0b1
commit e0e6200e2f
8 changed files with 629 additions and 49 deletions

View File

@@ -2669,6 +2669,142 @@
}
}
},
"/panel/api/clients/bulkDel": {
"post": {
"tags": [
"Clients"
],
"summary": "Delete many clients in one call. The server processes the list sequentially so each delete sees the committed state of the previous one — avoids the race the per-email fan-out had on the panel side. Pass keepTraffic=true to retain the xray_client_traffic rows after deletion.",
"operationId": "post_panel_api_clients_bulkDel",
"requestBody": {
"required": true,
"content": {
"application/json": {
"schema": {
"type": "object"
},
"example": {
"emails": [
"alice",
"bob"
],
"keepTraffic": false
}
}
}
},
"responses": {
"200": {
"description": "Successful response",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"success": {
"type": "boolean"
},
"msg": {
"type": "string"
},
"obj": {}
}
},
"example": {
"success": true,
"obj": {
"deleted": 2,
"skipped": [
{
"email": "carol",
"reason": "client not found"
}
]
}
}
}
}
}
}
}
},
"/panel/api/clients/bulkCreate": {
"post": {
"tags": [
"Clients"
],
"summary": "Create many clients in one call. Body is a JSON array of {client, inboundIds} payloads — the same shape /add accepts. Items are processed sequentially; per-email skip reasons are returned for items that fail (e.g., duplicate email). Triggers a single Xray restart at the end if any inbound was running.",
"operationId": "post_panel_api_clients_bulkCreate",
"requestBody": {
"required": true,
"content": {
"application/json": {
"schema": {
"type": "object"
},
"example": [
{
"client": {
"email": "alice@example.com",
"totalGB": 53687091200,
"expiryTime": 0,
"enable": true
},
"inboundIds": [
7
]
},
{
"client": {
"email": "bob@example.com",
"totalGB": 53687091200,
"expiryTime": 0,
"enable": true
},
"inboundIds": [
7,
9
]
}
]
}
}
},
"responses": {
"200": {
"description": "Successful response",
"content": {
"application/json": {
"schema": {
"type": "object",
"properties": {
"success": {
"type": "boolean"
},
"msg": {
"type": "string"
},
"obj": {}
}
},
"example": {
"success": true,
"obj": {
"created": 2,
"skipped": [
{
"email": "alice@example.com",
"reason": "email already in use"
}
]
}
}
}
}
}
}
}
},
"/panel/api/clients/resetTraffic/{email}": {
"post": {
"tags": [

View File

@@ -10,6 +10,8 @@ import {
InboundOptionsSchema,
OnlinesSchema,
BulkAdjustResultSchema,
BulkCreateResultSchema,
BulkDeleteResultSchema,
DelDepletedResultSchema,
type ClientHydrate,
type ClientRecord,
@@ -18,6 +20,8 @@ import {
type ClientPageResponse,
type InboundOption,
type BulkAdjustResult,
type BulkCreateResult,
type BulkDeleteResult,
} from '@/schemas/client';
import { DefaultsPayloadSchema } from '@/schemas/defaults';
@@ -209,18 +213,20 @@ export function useClients() {
onSuccess: (msg) => { if (msg?.success) invalidateAll(); },
});
const removeManyMut = useMutation({
mutationFn: async ({ emails, keepTraffic }: { emails: string[]; keepTraffic?: boolean }) => {
const suffix = keepTraffic ? '?keepTraffic=1' : '';
const results: Msg<unknown>[] = [];
for (const email of emails) {
const url = `/panel/api/clients/del/${encodeURIComponent(email)}${suffix}`;
const res = await HttpUtil.post(url, undefined, { silent: true });
results.push(res);
}
return results;
const bulkDeleteMut = useMutation({
mutationFn: async (payload: { emails: string[]; keepTraffic?: boolean }): Promise<Msg<BulkDeleteResult>> => {
const raw = await HttpUtil.post('/panel/api/clients/bulkDel', payload, JSON_HEADERS);
return parseMsg(raw, BulkDeleteResultSchema, 'clients/bulkDel');
},
onSuccess: () => invalidateAll(),
onSuccess: (msg) => { if (msg?.success) invalidateAll(); },
});
const bulkCreateMut = useMutation({
mutationFn: async (payloads: unknown[]): Promise<Msg<BulkCreateResult>> => {
const raw = await HttpUtil.post('/panel/api/clients/bulkCreate', payloads, JSON_HEADERS);
return parseMsg(raw, BulkCreateResultSchema, 'clients/bulkCreate');
},
onSuccess: (msg) => { if (msg?.success) invalidateAll(); },
});
const bulkAdjustMut = useMutation({
@@ -271,10 +277,14 @@ export function useClients() {
if (!email) return Promise.resolve(null as unknown as Msg<unknown>);
return removeMut.mutateAsync({ email, keepTraffic });
}, [removeMut]);
const removeMany = useCallback((emails: string[], keepTraffic = false) => {
if (!Array.isArray(emails) || emails.length === 0) return Promise.resolve([] as Msg<unknown>[]);
return removeManyMut.mutateAsync({ emails, keepTraffic });
}, [removeManyMut]);
const bulkDelete = useCallback((emails: string[], keepTraffic = false) => {
if (!Array.isArray(emails) || emails.length === 0) return Promise.resolve(null as unknown as Msg<BulkDeleteResult>);
return bulkDeleteMut.mutateAsync({ emails, keepTraffic });
}, [bulkDeleteMut]);
const bulkCreate = useCallback((payloads: unknown[]) => {
if (!Array.isArray(payloads) || payloads.length === 0) return Promise.resolve(null as unknown as Msg<BulkCreateResult>);
return bulkCreateMut.mutateAsync(payloads);
}, [bulkCreateMut]);
const bulkAdjust = useCallback((emails: string[], addDays: number, addBytes: number) => {
if (!Array.isArray(emails) || emails.length === 0) return Promise.resolve(null);
return bulkAdjustMut.mutateAsync({ emails, addDays, addBytes });
@@ -379,9 +389,10 @@ export function useClients() {
pageSize,
refresh,
create,
bulkCreate,
update,
remove,
removeMany,
bulkDelete,
bulkAdjust,
attach,
detach,

View File

@@ -521,6 +521,20 @@ export const sections: readonly Section[] = [
body: '{\n "emails": ["alice", "bob"],\n "addDays": 30,\n "addBytes": 53687091200\n}',
response: '{\n "success": true,\n "obj": {\n "adjusted": 2,\n "skipped": [\n { "email": "carol", "reason": "unlimited expiry" }\n ]\n }\n}',
},
{
method: 'POST',
path: '/panel/api/clients/bulkDel',
summary: 'Delete many clients in one call. The server processes the list sequentially so each delete sees the committed state of the previous one — avoids the race the per-email fan-out had on the panel side. Pass keepTraffic=true to retain the xray_client_traffic rows after deletion.',
body: '{\n "emails": ["alice", "bob"],\n "keepTraffic": false\n}',
response: '{\n "success": true,\n "obj": {\n "deleted": 2,\n "skipped": [\n { "email": "carol", "reason": "client not found" }\n ]\n }\n}',
},
{
method: 'POST',
path: '/panel/api/clients/bulkCreate',
summary: 'Create many clients in one call. Body is a JSON array of {client, inboundIds} payloads — the same shape /add accepts. Items are processed sequentially; per-email skip reasons are returned for items that fail (e.g., duplicate email). Triggers a single Xray restart at the end if any inbound was running.',
body: '[\n {\n "client": {\n "email": "alice@example.com",\n "totalGB": 53687091200,\n "expiryTime": 0,\n "enable": true\n },\n "inboundIds": [7]\n },\n {\n "client": {\n "email": "bob@example.com",\n "totalGB": 53687091200,\n "expiryTime": 0,\n "enable": true\n },\n "inboundIds": [7, 9]\n }\n]',
response: '{\n "success": true,\n "obj": {\n "created": 2,\n "skipped": [\n { "email": "alice@example.com", "reason": "email already in use" }\n ]\n }\n}',
},
{
method: 'POST',
path: '/panel/api/clients/resetTraffic/:email',

View File

@@ -5,14 +5,13 @@ import { SyncOutlined } from '@ant-design/icons';
import dayjs from 'dayjs';
import type { Dayjs } from 'dayjs';
import { HttpUtil, RandomUtil, SizeFormatter } from '@/utils';
import { RandomUtil, SizeFormatter } from '@/utils';
import { TLS_FLOW_CONTROL } from '@/schemas/primitives';
import DateTimePicker from '@/components/DateTimePicker';
import type { InboundOption } from '@/hooks/useClients';
import { useClients, type InboundOption } from '@/hooks/useClients';
import { ClientBulkAddFormSchema, type ClientBulkAddFormValues } from '@/schemas/client';
const FLOW_OPTIONS = Object.values(TLS_FLOW_CONTROL);
const JSON_HEADERS = { headers: { 'Content-Type': 'application/json' } } as const;
const MULTI_CLIENT_PROTOCOLS = new Set([
'shadowsocks', 'vless', 'vmess', 'trojan', 'hysteria',
@@ -55,6 +54,7 @@ export default function ClientBulkAddModal({
}: ClientBulkAddModalProps) {
const { t } = useTranslation();
const [messageApi, messageContextHolder] = message.useMessage();
const { bulkCreate } = useClients();
const [form, setForm] = useState<FormState>(emptyForm);
const [delayedStart, setDelayedStart] = useState(false);
@@ -143,10 +143,9 @@ export default function ClientBulkAddModal({
if (emails.length === 0) return;
setSaving(true);
const silentJsonOpts = { ...JSON_HEADERS, silent: true };
try {
const results = await Promise.all(emails.map((email) => {
const client = {
const payloads = emails.map((email) => ({
client: {
email,
subId: form.subId || RandomUtil.randomLowerAndNum(16),
id: RandomUtil.randomUUID(),
@@ -158,21 +157,15 @@ export default function ClientBulkAddModal({
limitIp: Number(form.limitIp) || 0,
comment: form.comment,
enable: true,
};
const payload = { client, inboundIds: form.inboundIds };
return HttpUtil.post('/panel/api/clients/add', payload, silentJsonOpts);
},
inboundIds: form.inboundIds,
}));
let ok = 0;
let failed = 0;
let firstError = '';
for (const msg of results) {
if (msg?.success) ok++;
else {
failed++;
if (!firstError && msg?.msg) firstError = msg.msg;
}
}
if (failed === 0) {
const msg = await bulkCreate(payloads);
const ok = msg?.obj?.created ?? 0;
const skipped = msg?.obj?.skipped ?? [];
const failed = skipped.length;
const firstError = skipped[0]?.reason ?? msg?.msg ?? '';
if (failed === 0 && msg?.success) {
messageApi.success(t('pages.clients.toasts.bulkCreated', { count: ok }));
} else {
messageApi.warning(firstError

View File

@@ -103,7 +103,7 @@ export default function ClientsPage() {
setQuery,
inbounds, onlines, loading, fetched, subSettings,
ipLimitEnable, tgBotEnable, expireDiff, trafficDiff, pageSize,
create, update, remove, removeMany, bulkAdjust, attach, detach,
create, update, remove, bulkDelete, bulkAdjust, attach, detach,
resetTraffic, resetAllTraffics, delDepleted, setEnable,
applyTrafficEvent, applyClientStatsEvent,
hydrate,
@@ -406,19 +406,13 @@ export default function ClientsPage() {
okType: 'danger',
cancelText: t('cancel'),
onOk: async () => {
const results = await removeMany(emails);
const msg = await bulkDelete(emails);
setSelectedRowKeys([]);
let ok = 0;
let failed = 0;
let firstError = '';
for (const msg of results) {
if (msg?.success) ok++;
else {
failed++;
if (!firstError && msg?.msg) firstError = msg.msg;
}
}
if (failed === 0) {
const ok = msg?.obj?.deleted ?? 0;
const skipped = msg?.obj?.skipped ?? [];
const failed = skipped.length;
const firstError = skipped[0]?.reason ?? msg?.msg ?? '';
if (failed === 0 && msg?.success) {
messageApi.success(t('pages.clients.toasts.bulkDeleted', { count: ok }));
} else {
messageApi.warning(firstError

View File

@@ -77,6 +77,20 @@ export const BulkAdjustResultSchema = z.object({
.optional(),
});
export const BulkDeleteResultSchema = z.object({
deleted: z.number(),
skipped: z
.array(z.object({ email: z.string(), reason: z.string() }))
.optional(),
});
export const BulkCreateResultSchema = z.object({
created: z.number(),
skipped: z
.array(z.object({ email: z.string(), reason: z.string() }))
.optional(),
});
export const DelDepletedResultSchema = z.object({
deleted: z.number().optional(),
});
@@ -137,6 +151,8 @@ export type ClientsSummary = z.infer<typeof ClientsSummarySchema>;
export type ClientPageResponse = z.infer<typeof ClientPageResponseSchema>;
export type ClientHydrate = z.infer<typeof ClientHydrateSchema>;
export type BulkAdjustResult = z.infer<typeof BulkAdjustResultSchema>;
export type BulkDeleteResult = z.infer<typeof BulkDeleteResultSchema>;
export type BulkCreateResult = z.infer<typeof BulkCreateResultSchema>;
export type ClientBulkAddFormValues = z.infer<typeof ClientBulkAddFormSchema>;
export type ClientBulkAdjustFormValues = z.infer<typeof ClientBulkAdjustFormSchema>;
export type ClientFormValues = z.infer<typeof ClientFormSchema>;

View File

@@ -45,6 +45,8 @@ func (a *ClientController) initRouter(g *gin.RouterGroup) {
g.POST("/resetAllTraffics", a.resetAllTraffics)
g.POST("/delDepleted", a.delDepleted)
g.POST("/bulkAdjust", a.bulkAdjust)
g.POST("/bulkDel", a.bulkDelete)
g.POST("/bulkCreate", a.bulkCreate)
g.POST("/resetTraffic/:email", a.resetTrafficByEmail)
g.POST("/updateTraffic/:email", a.updateTrafficByEmail)
g.POST("/ips/:email", a.getIps)
@@ -203,6 +205,47 @@ func (a *ClientController) bulkAdjust(c *gin.Context) {
notifyClientsChanged()
}
type bulkDeleteRequest struct {
Emails []string `json:"emails"`
KeepTraffic bool `json:"keepTraffic"`
}
func (a *ClientController) bulkDelete(c *gin.Context) {
var req bulkDeleteRequest
if err := c.ShouldBindJSON(&req); err != nil {
jsonMsg(c, I18nWeb(c, "somethingWentWrong"), err)
return
}
result, needRestart, err := a.clientService.BulkDelete(&a.inboundService, req.Emails, req.KeepTraffic)
if err != nil {
jsonMsg(c, I18nWeb(c, "somethingWentWrong"), err)
return
}
jsonObj(c, result, nil)
if needRestart {
a.xrayService.SetToNeedRestart()
}
notifyClientsChanged()
}
func (a *ClientController) bulkCreate(c *gin.Context) {
var payloads []service.ClientCreatePayload
if err := c.ShouldBindJSON(&payloads); err != nil {
jsonMsg(c, I18nWeb(c, "somethingWentWrong"), err)
return
}
result, needRestart, err := a.clientService.BulkCreate(&a.inboundService, payloads)
if err != nil {
jsonMsg(c, I18nWeb(c, "somethingWentWrong"), err)
return
}
jsonObj(c, result, nil)
if needRestart {
a.xrayService.SetToNeedRestart()
}
notifyClientsChanged()
}
func (a *ClientController) delDepleted(c *gin.Context) {
deleted, needRestart, err := a.clientService.DelDepleted(&a.inboundService)
if err != nil {

View File

@@ -1265,6 +1265,379 @@ func (s *ClientService) BulkAdjust(inboundSvc *InboundService, emails []string,
return result, needRestart, nil
}
// BulkDeleteResult mirrors BulkAdjustResult: total deleted plus per-email
// skip reasons when an email could not be processed.
type BulkDeleteResult struct {
Deleted int `json:"deleted"`
Skipped []BulkDeleteReport `json:"skipped,omitempty"`
}
type BulkDeleteReport struct {
Email string `json:"email"`
Reason string `json:"reason"`
}
// BulkDelete removes every client in the list in one optimized pass.
// Instead of running the full single-delete pipeline N times (which would
// re-read, re-parse, and re-write each inbound's settings JSON for every
// email), it groups emails by inbound and performs a single
// read-modify-write per inbound. Per-row DB cleanups are also batched with
// IN-clause queries at the end. Errors on a particular email are recorded
// in the Skipped list and processing continues for the rest.
func (s *ClientService) BulkDelete(inboundSvc *InboundService, emails []string, keepTraffic bool) (BulkDeleteResult, bool, error) {
result := BulkDeleteResult{}
seen := map[string]struct{}{}
cleanEmails := make([]string, 0, len(emails))
for _, e := range emails {
e = strings.TrimSpace(e)
if e == "" {
continue
}
if _, ok := seen[e]; ok {
continue
}
seen[e] = struct{}{}
cleanEmails = append(cleanEmails, e)
}
if len(cleanEmails) == 0 {
return result, false, nil
}
db := database.GetDB()
var records []model.ClientRecord
if err := db.Where("email IN ?", cleanEmails).Find(&records).Error; err != nil {
return result, false, err
}
recordsByEmail := make(map[string]*model.ClientRecord, len(records))
for i := range records {
recordsByEmail[records[i].Email] = &records[i]
tombstoneClientEmail(records[i].Email)
}
skippedReasons := map[string]string{}
for _, email := range cleanEmails {
if _, ok := recordsByEmail[email]; !ok {
skippedReasons[email] = "client not found"
}
}
clientIds := make([]int, 0, len(recordsByEmail))
recordIdToEmail := make(map[int]string, len(recordsByEmail))
for _, r := range recordsByEmail {
clientIds = append(clientIds, r.Id)
recordIdToEmail[r.Id] = r.Email
}
emailsByInbound := map[int][]string{}
if len(clientIds) > 0 {
var mappings []model.ClientInbound
if err := db.Where("client_id IN ?", clientIds).Find(&mappings).Error; err != nil {
return result, false, err
}
for _, m := range mappings {
email, ok := recordIdToEmail[m.ClientId]
if !ok {
continue
}
emailsByInbound[m.InboundId] = append(emailsByInbound[m.InboundId], email)
}
}
needRestart := false
for inboundId, ibEmails := range emailsByInbound {
ibResult := s.bulkDelInboundClients(inboundSvc, inboundId, ibEmails, recordsByEmail)
if ibResult.needRestart {
needRestart = true
}
for email, reason := range ibResult.perEmailSkipped {
if _, already := skippedReasons[email]; !already {
skippedReasons[email] = reason
}
}
}
successEmails := make([]string, 0, len(recordsByEmail))
successIds := make([]int, 0, len(recordsByEmail))
for email, rec := range recordsByEmail {
if _, skipped := skippedReasons[email]; skipped {
continue
}
successEmails = append(successEmails, email)
successIds = append(successIds, rec.Id)
}
if len(successIds) > 0 {
if err := db.Where("client_id IN ?", successIds).Delete(&model.ClientInbound{}).Error; err != nil {
return result, needRestart, err
}
if !keepTraffic && len(successEmails) > 0 {
if err := db.Where("email IN ?", successEmails).Delete(&xray.ClientTraffic{}).Error; err != nil {
return result, needRestart, err
}
if err := db.Where("client_email IN ?", successEmails).Delete(&model.InboundClientIps{}).Error; err != nil {
return result, needRestart, err
}
}
if err := db.Where("id IN ?", successIds).Delete(&model.ClientRecord{}).Error; err != nil {
return result, needRestart, err
}
}
result.Deleted = len(successEmails)
for email, reason := range skippedReasons {
result.Skipped = append(result.Skipped, BulkDeleteReport{Email: email, Reason: reason})
}
return result, needRestart, nil
}
type bulkInboundDeleteResult struct {
perEmailSkipped map[string]string
needRestart bool
}
// bulkDelInboundClients removes multiple clients from a single inbound's
// settings JSON in one read-modify-write cycle, runs the xray runtime
// RemoveUser/DeleteUser calls, and persists the inbound. The returned map
// holds per-email failure reasons; emails not present in the map are
// considered successful for this inbound.
func (s *ClientService) bulkDelInboundClients(
inboundSvc *InboundService,
inboundId int,
emails []string,
records map[string]*model.ClientRecord,
) bulkInboundDeleteResult {
res := bulkInboundDeleteResult{perEmailSkipped: map[string]string{}}
defer lockInbound(inboundId).Unlock()
oldInbound, err := inboundSvc.GetInbound(inboundId)
if err != nil {
logger.Error("Load Old Data Error")
for _, e := range emails {
res.perEmailSkipped[e] = err.Error()
}
return res
}
var settings map[string]any
if err := json.Unmarshal([]byte(oldInbound.Settings), &settings); err != nil {
for _, e := range emails {
res.perEmailSkipped[e] = err.Error()
}
return res
}
clientKey := "id"
switch oldInbound.Protocol {
case model.Trojan:
clientKey = "password"
case model.Shadowsocks:
clientKey = "email"
case model.Hysteria, model.Hysteria2:
clientKey = "auth"
}
keyToEmail := make(map[string]string, len(emails))
for _, email := range emails {
rec := records[email]
if rec == nil {
res.perEmailSkipped[email] = "client not found"
continue
}
key := clientKeyForProtocol(oldInbound.Protocol, rec)
if key == "" {
res.perEmailSkipped[email] = "missing client key for protocol"
continue
}
keyToEmail[key] = email
}
interfaceClients, _ := settings["clients"].([]any)
newClients := make([]any, 0, len(interfaceClients))
foundEmails := map[string]bool{}
enableByEmail := map[string]bool{}
for _, client := range interfaceClients {
c, ok := client.(map[string]any)
if !ok {
newClients = append(newClients, client)
continue
}
cKey, _ := c[clientKey].(string)
if targetEmail, found := keyToEmail[cKey]; found {
foundEmails[targetEmail] = true
if em, _ := c["email"].(string); em != "" {
en, _ := c["enable"].(bool)
enableByEmail[em] = en
}
continue
}
newClients = append(newClients, client)
}
for _, email := range keyToEmail {
if !foundEmails[email] {
res.perEmailSkipped[email] = "Client Not Found In Inbound"
}
}
db := database.GetDB()
newClients = compactOrphans(db, newClients)
if newClients == nil {
newClients = []any{}
}
settings["clients"] = newClients
newSettings, err := json.MarshalIndent(settings, "", " ")
if err != nil {
for email := range foundEmails {
if _, skip := res.perEmailSkipped[email]; !skip {
res.perEmailSkipped[email] = err.Error()
}
}
return res
}
oldInbound.Settings = string(newSettings)
foundList := make([]string, 0, len(foundEmails))
for email := range foundEmails {
foundList = append(foundList, email)
}
notDepletedByEmail := map[string]bool{}
if len(foundList) > 0 {
type trafficRow struct {
Email string
Enable bool
}
var rows []trafficRow
if err := db.Model(xray.ClientTraffic{}).
Where("email IN ?", foundList).
Select("email, enable").
Scan(&rows).Error; err == nil {
for _, r := range rows {
notDepletedByEmail[r.Email] = r.Enable
}
}
}
for email := range foundEmails {
shared, sharedErr := inboundSvc.emailUsedByOtherInbounds(email, inboundId)
if sharedErr != nil {
res.perEmailSkipped[email] = sharedErr.Error()
delete(foundEmails, email)
continue
}
if shared {
continue
}
if delErr := inboundSvc.DelClientIPs(db, email); delErr != nil {
logger.Error("Error in delete client IPs")
res.perEmailSkipped[email] = delErr.Error()
delete(foundEmails, email)
continue
}
if delErr := inboundSvc.DelClientStat(db, email); delErr != nil {
logger.Error("Delete stats Data Error")
res.perEmailSkipped[email] = delErr.Error()
delete(foundEmails, email)
continue
}
}
if oldInbound.NodeID == nil {
rt, rterr := inboundSvc.runtimeFor(oldInbound)
if rterr != nil {
res.needRestart = true
} else {
for email := range foundEmails {
if !enableByEmail[email] || !notDepletedByEmail[email] {
continue
}
err1 := rt.RemoveUser(context.Background(), oldInbound, email)
if err1 == nil {
logger.Debug("Client deleted on", rt.Name(), ":", email)
} else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", email)) {
logger.Debug("User is already deleted. Nothing to do more...")
} else {
logger.Debug("Error in deleting client on", rt.Name(), ":", err1)
res.needRestart = true
}
}
}
} else {
rt, rterr := inboundSvc.runtimeFor(oldInbound)
if rterr != nil {
for email := range foundEmails {
res.perEmailSkipped[email] = rterr.Error()
delete(foundEmails, email)
}
} else {
for email := range foundEmails {
if err1 := rt.DeleteUser(context.Background(), oldInbound, email); err1 != nil {
res.perEmailSkipped[email] = err1.Error()
delete(foundEmails, email)
}
}
}
}
if err := db.Save(oldInbound).Error; err != nil {
for email := range foundEmails {
if _, skip := res.perEmailSkipped[email]; !skip {
res.perEmailSkipped[email] = err.Error()
}
}
return res
}
finalClients, err := inboundSvc.GetClients(oldInbound)
if err != nil {
return res
}
if err := s.SyncInbound(db, inboundId, finalClients); err != nil {
return res
}
return res
}
// BulkCreateResult mirrors BulkAdjustResult for the create flow.
type BulkCreateResult struct {
Created int `json:"created"`
Skipped []BulkCreateReport `json:"skipped,omitempty"`
}
type BulkCreateReport struct {
Email string `json:"email"`
Reason string `json:"reason"`
}
// BulkCreate iterates payloads sequentially. Each item is the same shape
// the single-create endpoint accepts, so callers can submit a heterogeneous
// list (different inboundIds, plans, etc.) in one round-trip.
func (s *ClientService) BulkCreate(inboundSvc *InboundService, payloads []ClientCreatePayload) (BulkCreateResult, bool, error) {
result := BulkCreateResult{}
needRestart := false
for i := range payloads {
p := payloads[i]
email := strings.TrimSpace(p.Client.Email)
nr, err := s.Create(inboundSvc, &p)
if err != nil {
if email == "" {
email = "(missing email)"
}
result.Skipped = append(result.Skipped, BulkCreateReport{Email: email, Reason: err.Error()})
continue
}
if nr {
needRestart = true
}
result.Created++
}
return result, needRestart, nil
}
func (s *ClientService) DelDepleted(inboundSvc *InboundService) (int, bool, error) {
db := database.GetDB()
now := time.Now().UnixMilli()