mirror of
https://github.com/MHSanaei/3x-ui.git
synced 2026-05-26 07:08:01 +00:00
* perf(inbounds): slim list payload + lazy hydrate for row actions
Adds GET /panel/api/inbounds/list/slim that returns the same list shape
but strips every per-client field besides email/enable/comment from
settings.clients[] and skips UUID/SubId enrichment on ClientStats.
The inbounds page only reads those three to compute its client counters
and badges, so the slim variant trims tens of bytes per client (uuid,
password, flow, security, totalGB, expiryTime, limitIp, tgId, ...).
On a panel with thousands of clients this is the dominant load-time
cost.
Detail flows (edit / info / qr / export / clone) call /get/:id through
a new hydrateInbound helper before opening — the slim list view never
needs the secrets it doesn't render.
* perf(clients): server-side pagination + slim row payload
Adds GET /panel/api/clients/list/paged that filters, sorts, and paginates
on the server, returns a slim row shape (drops uuid/password/auth/flow/
security/reverse/tgId per client), and includes a stable summary
(total, active, online[], depleted[], expiring[], deactive[]) computed
across the full DB row set so the dashboard cards don't change as the
user paginates or filters. Page size capped at 200.
useClients now exposes { clients (current page), total, filtered, query,
setQuery, summary, hydrate }. ClientsPage feeds its filter/sort/page
state into setQuery via a single effect, debounces search by 300ms, and
hydrates the full client record via /get/:email before opening edit/info/
qr modals. Local filter/sort logic and the all-clients summary memo are
gone.
On a 2000-client panel this turns the initial response from ~MB to ~25 row
slice (~10s of KB) and removes the all-client parse cost from every
refresh.
* perf(settings): use /inbounds/options for LDAP tag picker
The General settings tab only needs each inbound's tag/protocol/port to
fill a dropdown but was calling /panel/api/inbounds/list which ships the
full settings JSON with every embedded client. Switched it to /options
and added Tag to the projection. On a panel with thousands of clients
this drops the General-tab load payload from megabytes to a tiny
per-inbound row each.
* perf(clients): de-duplicate options + paged list fetches
Two issues caused each clients-page load to fire its requests twice:
1. setQuery in the hook took whatever object the consumer passed and
stored it as-is. The consumer (ClientsPage) constructs a new object
literal in an effect, so even when nothing actually changed the ref
was new — the hook's useEffect saw a new query and re-fetched.
Wrapped setQuery with a shallow value compare so identical params
are a no-op.
2. The picker /inbounds/options fetch was bundled into refresh() with a
length==0 guard, but the two back-to-back refreshes both saw an
empty inbounds array (the first hadn't resolved yet) so both fired
the request. Moved the options fetch into its own one-shot effect.
* perf(inbounds): share nodes list with form modal instead of refetching
InboundsPage and InboundFormModal both called useNodes() — each
instance maintains its own state and fires its own /panel/api/nodes/list
fetch on mount. Since the modal is always rendered (open or not), every
page load hit the endpoint twice.
Threaded nodes from the page through an availableNodes prop on the form
modal so they share one fetch.
* docs(api): register /clients/list/paged endpoint
TestAPIRoutesDocumented was failing because the new paginated clients
endpoint added in this branch wasn't listed in endpoints.js.
3184 lines
91 KiB
Go
3184 lines
91 KiB
Go
// Package service provides business logic services for the 3x-ui web panel,
|
||
// including inbound/outbound management, user administration, settings, and Xray integration.
|
||
package service
|
||
|
||
import (
|
||
"context"
|
||
"encoding/json"
|
||
"fmt"
|
||
"sort"
|
||
"strconv"
|
||
"strings"
|
||
"time"
|
||
|
||
"github.com/google/uuid"
|
||
"github.com/mhsanaei/3x-ui/v3/database"
|
||
"github.com/mhsanaei/3x-ui/v3/database/model"
|
||
"github.com/mhsanaei/3x-ui/v3/logger"
|
||
"github.com/mhsanaei/3x-ui/v3/util/common"
|
||
"github.com/mhsanaei/3x-ui/v3/web/runtime"
|
||
"github.com/mhsanaei/3x-ui/v3/xray"
|
||
|
||
"gorm.io/gorm"
|
||
"gorm.io/gorm/clause"
|
||
)
|
||
|
||
type InboundService struct {
|
||
xrayApi xray.XrayAPI
|
||
clientService ClientService
|
||
fallbackService FallbackService
|
||
}
|
||
|
||
func (s *InboundService) runtimeFor(ib *model.Inbound) (runtime.Runtime, error) {
|
||
mgr := runtime.GetManager()
|
||
if mgr == nil {
|
||
return nil, fmt.Errorf("runtime manager not initialised")
|
||
}
|
||
return mgr.RuntimeFor(ib.NodeID)
|
||
}
|
||
|
||
type CopyClientsResult struct {
|
||
Added []string `json:"added"`
|
||
Skipped []string `json:"skipped"`
|
||
Errors []string `json:"errors"`
|
||
}
|
||
|
||
// enrichClientStats parses each inbound's clients once, fills in the
|
||
// UUID/SubId fields on the preloaded ClientStats, and tops up rows owned by
|
||
// a sibling inbound (shared-email mode — the row is keyed on email so it
|
||
// only preloads on its owning inbound).
|
||
func (s *InboundService) enrichClientStats(db *gorm.DB, inbounds []*model.Inbound) {
|
||
if len(inbounds) == 0 {
|
||
return
|
||
}
|
||
clientsByInbound := make([][]model.Client, len(inbounds))
|
||
seenByInbound := make([]map[string]struct{}, len(inbounds))
|
||
missing := make(map[string]struct{})
|
||
for i, inbound := range inbounds {
|
||
clients, _ := s.GetClients(inbound)
|
||
clientsByInbound[i] = clients
|
||
seen := make(map[string]struct{}, len(inbound.ClientStats))
|
||
for _, st := range inbound.ClientStats {
|
||
if st.Email != "" {
|
||
seen[strings.ToLower(st.Email)] = struct{}{}
|
||
}
|
||
}
|
||
seenByInbound[i] = seen
|
||
for _, c := range clients {
|
||
if c.Email == "" {
|
||
continue
|
||
}
|
||
if _, ok := seen[strings.ToLower(c.Email)]; !ok {
|
||
missing[c.Email] = struct{}{}
|
||
}
|
||
}
|
||
}
|
||
if len(missing) > 0 {
|
||
emails := make([]string, 0, len(missing))
|
||
for e := range missing {
|
||
emails = append(emails, e)
|
||
}
|
||
var extra []xray.ClientTraffic
|
||
if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", emails).Find(&extra).Error; err != nil {
|
||
logger.Warning("enrichClientStats:", err)
|
||
} else {
|
||
byEmail := make(map[string]xray.ClientTraffic, len(extra))
|
||
for _, st := range extra {
|
||
byEmail[strings.ToLower(st.Email)] = st
|
||
}
|
||
for i, inbound := range inbounds {
|
||
for _, c := range clientsByInbound[i] {
|
||
if c.Email == "" {
|
||
continue
|
||
}
|
||
key := strings.ToLower(c.Email)
|
||
if _, ok := seenByInbound[i][key]; ok {
|
||
continue
|
||
}
|
||
if st, ok := byEmail[key]; ok {
|
||
inbound.ClientStats = append(inbound.ClientStats, st)
|
||
seenByInbound[i][key] = struct{}{}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
for i, inbound := range inbounds {
|
||
clients := clientsByInbound[i]
|
||
if len(clients) == 0 || len(inbound.ClientStats) == 0 {
|
||
continue
|
||
}
|
||
cMap := make(map[string]model.Client, len(clients))
|
||
for _, c := range clients {
|
||
cMap[strings.ToLower(c.Email)] = c
|
||
}
|
||
for j := range inbound.ClientStats {
|
||
email := strings.ToLower(inbound.ClientStats[j].Email)
|
||
if c, ok := cMap[email]; ok {
|
||
inbound.ClientStats[j].UUID = c.ID
|
||
inbound.ClientStats[j].SubId = c.SubID
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// GetInbounds retrieves all inbounds for a specific user with client stats.
|
||
func (s *InboundService) GetInbounds(userId int) ([]*model.Inbound, error) {
|
||
db := database.GetDB()
|
||
var inbounds []*model.Inbound
|
||
err := db.Model(model.Inbound{}).Preload("ClientStats").Where("user_id = ?", userId).Find(&inbounds).Error
|
||
if err != nil && err != gorm.ErrRecordNotFound {
|
||
return nil, err
|
||
}
|
||
s.enrichClientStats(db, inbounds)
|
||
s.annotateFallbackParents(db, inbounds)
|
||
return inbounds, nil
|
||
}
|
||
|
||
// GetInboundsSlim returns the same list of inbounds as GetInbounds but
|
||
// strips every per-client field other than email / enable / comment from
|
||
// settings.clients and skips UUID/SubId enrichment on ClientStats. The
|
||
// inbounds page only needs those three to roll up client counts and
|
||
// render badges, so this trims tens of bytes per client (UUID, password,
|
||
// flow, security, totalGB, expiryTime, limitIp, tgId, ...) which adds
|
||
// up fast on installs with thousands of clients.
|
||
//
|
||
// Full client data is still available through GET /panel/api/inbounds/get/:id
|
||
// for the edit/info/qr/export/clone flows that need it.
|
||
func (s *InboundService) GetInboundsSlim(userId int) ([]*model.Inbound, error) {
|
||
db := database.GetDB()
|
||
var inbounds []*model.Inbound
|
||
err := db.Model(model.Inbound{}).Preload("ClientStats").Where("user_id = ?", userId).Find(&inbounds).Error
|
||
if err != nil && err != gorm.ErrRecordNotFound {
|
||
return nil, err
|
||
}
|
||
s.annotateFallbackParents(db, inbounds)
|
||
for _, ib := range inbounds {
|
||
ib.Settings = slimSettingsClients(ib.Settings)
|
||
}
|
||
return inbounds, nil
|
||
}
|
||
|
||
// slimSettingsClients rewrites the inbound settings JSON so settings.clients[]
|
||
// keeps only the fields the list view actually reads. Returns the input
|
||
// unchanged when the JSON can't be parsed or has no clients array.
|
||
func slimSettingsClients(settings string) string {
|
||
if settings == "" {
|
||
return settings
|
||
}
|
||
var raw map[string]any
|
||
if err := json.Unmarshal([]byte(settings), &raw); err != nil {
|
||
return settings
|
||
}
|
||
clients, ok := raw["clients"].([]any)
|
||
if !ok || len(clients) == 0 {
|
||
return settings
|
||
}
|
||
slim := make([]any, 0, len(clients))
|
||
for _, entry := range clients {
|
||
c, ok := entry.(map[string]any)
|
||
if !ok {
|
||
continue
|
||
}
|
||
row := make(map[string]any, 3)
|
||
if v, ok := c["email"]; ok {
|
||
row["email"] = v
|
||
}
|
||
if v, ok := c["enable"]; ok {
|
||
row["enable"] = v
|
||
}
|
||
if v, ok := c["comment"]; ok && v != "" {
|
||
row["comment"] = v
|
||
}
|
||
slim = append(slim, row)
|
||
}
|
||
raw["clients"] = slim
|
||
out, err := json.Marshal(raw)
|
||
if err != nil {
|
||
return settings
|
||
}
|
||
return string(out)
|
||
}
|
||
|
||
// annotateFallbackParents fills FallbackParent on each inbound that is
|
||
// the child side of a fallback rule. One DB round-trip serves the full
|
||
// list — the frontend needs this to rewrite the child's client-share
|
||
// link so it points at the master's reachable endpoint.
|
||
func (s *InboundService) annotateFallbackParents(db *gorm.DB, inbounds []*model.Inbound) {
|
||
if len(inbounds) == 0 {
|
||
return
|
||
}
|
||
childIds := make([]int, 0, len(inbounds))
|
||
for _, ib := range inbounds {
|
||
childIds = append(childIds, ib.Id)
|
||
}
|
||
var rows []model.InboundFallback
|
||
if err := db.Where("child_id IN ?", childIds).
|
||
Order("sort_order ASC, id ASC").
|
||
Find(&rows).Error; err != nil {
|
||
return
|
||
}
|
||
first := make(map[int]model.InboundFallback, len(rows))
|
||
for _, r := range rows {
|
||
if _, ok := first[r.ChildId]; !ok {
|
||
first[r.ChildId] = r
|
||
}
|
||
}
|
||
for _, ib := range inbounds {
|
||
if r, ok := first[ib.Id]; ok {
|
||
ib.FallbackParent = &model.FallbackParentInfo{
|
||
MasterId: r.MasterId,
|
||
Path: r.Path,
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
// InboundOption is the lightweight projection of an inbound used by client UI
|
||
// pickers — only the fields needed to render labels, filter by protocol, and
|
||
// decide whether the XTLS Vision flow selector should appear. Keeping this
|
||
// payload minimal avoids shipping per-client settings and traffic stats just
|
||
// to populate a dropdown.
|
||
type InboundOption struct {
|
||
Id int `json:"id"`
|
||
Remark string `json:"remark"`
|
||
Tag string `json:"tag"`
|
||
Protocol string `json:"protocol"`
|
||
Port int `json:"port"`
|
||
TlsFlowCapable bool `json:"tlsFlowCapable"`
|
||
}
|
||
|
||
// GetInboundOptions returns the picker-sized projection of the user's inbounds.
|
||
// The TlsFlowCapable flag mirrors Inbound.canEnableTlsFlow() on the frontend
|
||
// (VLESS over TCP with tls or reality) so the client modal does not need
|
||
// StreamSettings to decide whether to show the Flow field.
|
||
func (s *InboundService) GetInboundOptions(userId int) ([]InboundOption, error) {
|
||
db := database.GetDB()
|
||
var rows []struct {
|
||
Id int `gorm:"column:id"`
|
||
Remark string `gorm:"column:remark"`
|
||
Tag string `gorm:"column:tag"`
|
||
Protocol string `gorm:"column:protocol"`
|
||
Port int `gorm:"column:port"`
|
||
StreamSettings string `gorm:"column:stream_settings"`
|
||
}
|
||
err := db.Table("inbounds").
|
||
Select("id, remark, tag, protocol, port, stream_settings").
|
||
Where("user_id = ?", userId).
|
||
Order("id ASC").
|
||
Scan(&rows).Error
|
||
if err != nil && err != gorm.ErrRecordNotFound {
|
||
return nil, err
|
||
}
|
||
out := make([]InboundOption, 0, len(rows))
|
||
for _, r := range rows {
|
||
out = append(out, InboundOption{
|
||
Id: r.Id,
|
||
Remark: r.Remark,
|
||
Tag: r.Tag,
|
||
Protocol: r.Protocol,
|
||
Port: r.Port,
|
||
TlsFlowCapable: inboundCanEnableTlsFlow(r.Protocol, r.StreamSettings),
|
||
})
|
||
}
|
||
return out, nil
|
||
}
|
||
|
||
// inboundCanEnableTlsFlow mirrors Inbound.canEnableTlsFlow() from the frontend:
|
||
// XTLS Vision is only valid for VLESS on TCP with tls or reality.
|
||
func inboundCanEnableTlsFlow(protocol, streamSettings string) bool {
|
||
if protocol != string(model.VLESS) {
|
||
return false
|
||
}
|
||
if streamSettings == "" {
|
||
return false
|
||
}
|
||
var stream struct {
|
||
Network string `json:"network"`
|
||
Security string `json:"security"`
|
||
}
|
||
if err := json.Unmarshal([]byte(streamSettings), &stream); err != nil {
|
||
return false
|
||
}
|
||
if stream.Network != "tcp" {
|
||
return false
|
||
}
|
||
return stream.Security == "tls" || stream.Security == "reality"
|
||
}
|
||
|
||
// inboundCanHostFallbacks gates the settings.fallbacks injection.
|
||
// Xray only honors fallbacks on VLESS and Trojan inbounds carried over
|
||
// TCP transport with TLS or Reality security.
|
||
func inboundCanHostFallbacks(ib *model.Inbound) bool {
|
||
if ib == nil {
|
||
return false
|
||
}
|
||
if ib.Protocol != model.VLESS && ib.Protocol != model.Trojan {
|
||
return false
|
||
}
|
||
return inboundCanEnableTlsFlow(string(ib.Protocol), ib.StreamSettings) ||
|
||
(ib.Protocol == model.Trojan && trojanStreamSupportsFallbacks(ib.StreamSettings))
|
||
}
|
||
|
||
// trojanStreamSupportsFallbacks mirrors the Trojan side of the same gate
|
||
// (Trojan reuses XTLS-Vision capable streams: tcp + tls or reality).
|
||
func trojanStreamSupportsFallbacks(streamSettings string) bool {
|
||
if streamSettings == "" {
|
||
return false
|
||
}
|
||
var stream struct {
|
||
Network string `json:"network"`
|
||
Security string `json:"security"`
|
||
}
|
||
if err := json.Unmarshal([]byte(streamSettings), &stream); err != nil {
|
||
return false
|
||
}
|
||
if stream.Network != "tcp" {
|
||
return false
|
||
}
|
||
return stream.Security == "tls" || stream.Security == "reality"
|
||
}
|
||
|
||
// GetAllInbounds retrieves all inbounds with client stats.
|
||
func (s *InboundService) GetAllInbounds() ([]*model.Inbound, error) {
|
||
db := database.GetDB()
|
||
var inbounds []*model.Inbound
|
||
err := db.Model(model.Inbound{}).Preload("ClientStats").Find(&inbounds).Error
|
||
if err != nil && err != gorm.ErrRecordNotFound {
|
||
return nil, err
|
||
}
|
||
s.enrichClientStats(db, inbounds)
|
||
return inbounds, nil
|
||
}
|
||
|
||
func (s *InboundService) GetInboundsByTrafficReset(period string) ([]*model.Inbound, error) {
|
||
db := database.GetDB()
|
||
var inbounds []*model.Inbound
|
||
err := db.Model(model.Inbound{}).Where("traffic_reset = ?", period).Find(&inbounds).Error
|
||
if err != nil && err != gorm.ErrRecordNotFound {
|
||
return nil, err
|
||
}
|
||
return inbounds, nil
|
||
}
|
||
|
||
func (s *InboundService) GetClients(inbound *model.Inbound) ([]model.Client, error) {
|
||
settings := map[string][]model.Client{}
|
||
json.Unmarshal([]byte(inbound.Settings), &settings)
|
||
if settings == nil {
|
||
return nil, fmt.Errorf("setting is null")
|
||
}
|
||
|
||
clients := settings["clients"]
|
||
if clients == nil {
|
||
return nil, nil
|
||
}
|
||
return clients, nil
|
||
}
|
||
|
||
func (s *InboundService) getAllEmails() ([]string, error) {
|
||
db := database.GetDB()
|
||
var emails []string
|
||
query := fmt.Sprintf(
|
||
"SELECT DISTINCT %s %s",
|
||
database.JSONFieldText("client.value", "email"),
|
||
database.JSONClientsFromInbound(),
|
||
)
|
||
if err := db.Raw(query).Scan(&emails).Error; err != nil {
|
||
return nil, err
|
||
}
|
||
return emails, nil
|
||
}
|
||
|
||
// getAllEmailSubIDs returns email→subId. An email seen with two different
|
||
// non-empty subIds is locked (mapped to "") so neither identity can claim it.
|
||
func (s *InboundService) getAllEmailSubIDs() (map[string]string, error) {
|
||
db := database.GetDB()
|
||
var rows []struct {
|
||
Email string
|
||
SubID string
|
||
}
|
||
query := fmt.Sprintf(
|
||
"SELECT %s AS email, %s AS sub_id %s",
|
||
database.JSONFieldText("client.value", "email"),
|
||
database.JSONFieldText("client.value", "subId"),
|
||
database.JSONClientsFromInbound(),
|
||
)
|
||
if err := db.Raw(query).Scan(&rows).Error; err != nil {
|
||
return nil, err
|
||
}
|
||
result := make(map[string]string, len(rows))
|
||
for _, r := range rows {
|
||
email := strings.ToLower(r.Email)
|
||
if email == "" {
|
||
continue
|
||
}
|
||
subID := r.SubID
|
||
if existing, ok := result[email]; ok {
|
||
if existing != subID {
|
||
result[email] = ""
|
||
}
|
||
continue
|
||
}
|
||
result[email] = subID
|
||
}
|
||
return result, nil
|
||
}
|
||
|
||
// emailUsedByOtherInbounds reports whether email lives in any inbound other
|
||
// than exceptInboundId. Empty email returns false.
|
||
func (s *InboundService) emailUsedByOtherInbounds(email string, exceptInboundId int) (bool, error) {
|
||
if email == "" {
|
||
return false, nil
|
||
}
|
||
db := database.GetDB()
|
||
var count int64
|
||
query := fmt.Sprintf(
|
||
"SELECT COUNT(*) %s WHERE inbounds.id != ? AND LOWER(%s) = LOWER(?)",
|
||
database.JSONClientsFromInbound(),
|
||
database.JSONFieldText("client.value", "email"),
|
||
)
|
||
if err := db.Raw(query, exceptInboundId, email).Scan(&count).Error; err != nil {
|
||
return false, err
|
||
}
|
||
return count > 0, nil
|
||
}
|
||
|
||
// normalizeStreamSettings clears StreamSettings for protocols that don't use it.
|
||
// Only vmess, vless, trojan, shadowsocks, and hysteria protocols use streamSettings.
|
||
func (s *InboundService) normalizeStreamSettings(inbound *model.Inbound) {
|
||
protocolsWithStream := map[model.Protocol]bool{
|
||
model.VMESS: true,
|
||
model.VLESS: true,
|
||
model.Trojan: true,
|
||
model.Shadowsocks: true,
|
||
model.Hysteria: true,
|
||
model.Hysteria2: true,
|
||
}
|
||
|
||
if !protocolsWithStream[inbound.Protocol] {
|
||
inbound.StreamSettings = ""
|
||
}
|
||
}
|
||
|
||
// AddInbound creates a new inbound configuration.
|
||
// It validates port uniqueness, client email uniqueness, and required fields,
|
||
// then saves the inbound to the database and optionally adds it to the running Xray instance.
|
||
// Returns the created inbound, whether Xray needs restart, and any error.
|
||
func (s *InboundService) AddInbound(inbound *model.Inbound) (*model.Inbound, bool, error) {
|
||
// Normalize streamSettings based on protocol
|
||
s.normalizeStreamSettings(inbound)
|
||
|
||
exist, err := s.checkPortConflict(inbound, 0)
|
||
if err != nil {
|
||
return inbound, false, err
|
||
}
|
||
if exist {
|
||
return inbound, false, common.NewError("Port already exists:", inbound.Port)
|
||
}
|
||
|
||
inbound.Tag, err = s.resolveInboundTag(inbound, 0)
|
||
if err != nil {
|
||
return inbound, false, err
|
||
}
|
||
|
||
clients, err := s.GetClients(inbound)
|
||
if err != nil {
|
||
return inbound, false, err
|
||
}
|
||
existEmail, err := s.clientService.checkEmailsExistForClients(s, clients)
|
||
if err != nil {
|
||
return inbound, false, err
|
||
}
|
||
if existEmail != "" {
|
||
return inbound, false, common.NewError("Duplicate email:", existEmail)
|
||
}
|
||
|
||
// Ensure created_at and updated_at on clients in settings
|
||
if len(clients) > 0 {
|
||
var settings map[string]any
|
||
if err2 := json.Unmarshal([]byte(inbound.Settings), &settings); err2 == nil && settings != nil {
|
||
now := time.Now().Unix() * 1000
|
||
updatedClients := make([]model.Client, 0, len(clients))
|
||
for _, c := range clients {
|
||
if c.CreatedAt == 0 {
|
||
c.CreatedAt = now
|
||
}
|
||
c.UpdatedAt = now
|
||
updatedClients = append(updatedClients, c)
|
||
}
|
||
settings["clients"] = updatedClients
|
||
if bs, err3 := json.MarshalIndent(settings, "", " "); err3 == nil {
|
||
inbound.Settings = string(bs)
|
||
} else {
|
||
logger.Debug("Unable to marshal inbound settings with timestamps:", err3)
|
||
}
|
||
} else if err2 != nil {
|
||
logger.Debug("Unable to parse inbound settings for timestamps:", err2)
|
||
}
|
||
}
|
||
|
||
// Secure client ID
|
||
for _, client := range clients {
|
||
switch inbound.Protocol {
|
||
case "trojan":
|
||
if client.Password == "" {
|
||
return inbound, false, common.NewError("empty client ID")
|
||
}
|
||
case "shadowsocks":
|
||
if client.Email == "" {
|
||
return inbound, false, common.NewError("empty client ID")
|
||
}
|
||
case "hysteria", "hysteria2":
|
||
if client.Auth == "" {
|
||
return inbound, false, common.NewError("empty client ID")
|
||
}
|
||
default:
|
||
if client.ID == "" {
|
||
return inbound, false, common.NewError("empty client ID")
|
||
}
|
||
}
|
||
}
|
||
|
||
db := database.GetDB()
|
||
tx := db.Begin()
|
||
defer func() {
|
||
if err == nil {
|
||
tx.Commit()
|
||
} else {
|
||
tx.Rollback()
|
||
}
|
||
}()
|
||
|
||
err = tx.Save(inbound).Error
|
||
if err == nil {
|
||
if len(inbound.ClientStats) == 0 {
|
||
for _, client := range clients {
|
||
s.AddClientStat(tx, inbound.Id, &client)
|
||
}
|
||
}
|
||
} else {
|
||
return inbound, false, err
|
||
}
|
||
|
||
if err = s.clientService.SyncInbound(tx, inbound.Id, clients); err != nil {
|
||
return inbound, false, err
|
||
}
|
||
|
||
needRestart := false
|
||
if inbound.Enable {
|
||
rt, rterr := s.runtimeFor(inbound)
|
||
if rterr != nil {
|
||
err = rterr
|
||
return inbound, false, err
|
||
}
|
||
if err1 := rt.AddInbound(context.Background(), inbound); err1 == nil {
|
||
logger.Debug("New inbound added on", rt.Name(), ":", inbound.Tag)
|
||
} else {
|
||
logger.Debug("Unable to add inbound on", rt.Name(), ":", err1)
|
||
if inbound.NodeID != nil {
|
||
err = err1
|
||
return inbound, false, err
|
||
}
|
||
needRestart = true
|
||
}
|
||
}
|
||
|
||
return inbound, needRestart, err
|
||
}
|
||
|
||
func (s *InboundService) DelInbound(id int) (bool, error) {
|
||
db := database.GetDB()
|
||
|
||
needRestart := false
|
||
var ib model.Inbound
|
||
loadErr := db.Model(model.Inbound{}).Where("id = ?", id).First(&ib).Error
|
||
if loadErr == nil {
|
||
shouldPushToRuntime := ib.NodeID != nil || ib.Enable
|
||
if shouldPushToRuntime {
|
||
rt, rterr := s.runtimeFor(&ib)
|
||
if rterr != nil {
|
||
logger.Warning("DelInbound: runtime lookup failed, deleting central row anyway:", rterr)
|
||
if ib.NodeID == nil {
|
||
needRestart = true
|
||
}
|
||
} else if err1 := rt.DelInbound(context.Background(), &ib); err1 == nil {
|
||
logger.Debug("Inbound deleted on", rt.Name(), ":", ib.Tag)
|
||
} else {
|
||
logger.Warning("DelInbound on", rt.Name(), "failed, deleting central row anyway:", err1)
|
||
if ib.NodeID == nil {
|
||
needRestart = true
|
||
}
|
||
}
|
||
} else {
|
||
logger.Debug("DelInbound: skipping runtime push for disabled local inbound id:", id)
|
||
}
|
||
} else {
|
||
logger.Debug("DelInbound: inbound not found, id:", id)
|
||
}
|
||
|
||
// Delete client traffics of inbounds
|
||
err := db.Where("inbound_id = ?", id).Delete(xray.ClientTraffic{}).Error
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
if err := s.clientService.DetachInbound(db, id); err != nil {
|
||
return false, err
|
||
}
|
||
inbound, err := s.GetInbound(id)
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
clients, err := s.GetClients(inbound)
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
// Bulk-delete client IPs for every email in this inbound. The previous
|
||
// per-client loop fired one DELETE per row — at 7k+ clients that meant
|
||
// thousands of synchronous SQL roundtrips and a multi-second freeze.
|
||
// Chunked to stay under SQLite's bind-variable limit on huge inbounds.
|
||
if len(clients) > 0 {
|
||
emails := make([]string, 0, len(clients))
|
||
for i := range clients {
|
||
if clients[i].Email != "" {
|
||
emails = append(emails, clients[i].Email)
|
||
}
|
||
}
|
||
for _, batch := range chunkStrings(uniqueNonEmptyStrings(emails), sqliteMaxVars) {
|
||
if err := db.Where("client_email IN ?", batch).
|
||
Delete(model.InboundClientIps{}).Error; err != nil {
|
||
return false, err
|
||
}
|
||
}
|
||
}
|
||
|
||
return needRestart, db.Delete(model.Inbound{}, id).Error
|
||
}
|
||
|
||
func (s *InboundService) GetInbound(id int) (*model.Inbound, error) {
|
||
db := database.GetDB()
|
||
inbound := &model.Inbound{}
|
||
err := db.Model(model.Inbound{}).First(inbound, id).Error
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
return inbound, nil
|
||
}
|
||
|
||
// SetInboundEnable toggles only the enable flag of an inbound, without
|
||
// rewriting the (potentially multi-MB) settings JSON. Used by the UI's
|
||
// per-row enable switch — for inbounds with thousands of clients the full
|
||
// UpdateInbound path is an order of magnitude too slow for an interactive
|
||
// toggle (parses + reserialises every client, runs O(N) traffic diff).
|
||
//
|
||
// Returns (needRestart, error). needRestart is true when the xray runtime
|
||
// could not be re-synced from the cached config and a full restart is
|
||
// required to pick up the change.
|
||
func (s *InboundService) SetInboundEnable(id int, enable bool) (bool, error) {
|
||
inbound, err := s.GetInbound(id)
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
if inbound.Enable == enable {
|
||
return false, nil
|
||
}
|
||
|
||
db := database.GetDB()
|
||
if err := db.Model(model.Inbound{}).Where("id = ?", id).
|
||
Update("enable", enable).Error; err != nil {
|
||
return false, err
|
||
}
|
||
inbound.Enable = enable
|
||
|
||
needRestart := false
|
||
rt, rterr := s.runtimeFor(inbound)
|
||
if rterr != nil {
|
||
if inbound.NodeID != nil {
|
||
return false, rterr
|
||
}
|
||
return true, nil
|
||
}
|
||
|
||
// Remote nodes interpret DelInbound as a real row delete (it hits
|
||
// panel/api/inbounds/del/:id on the remote), so toggling the enable
|
||
// switch on a remote inbound used to wipe the row entirely (#4402).
|
||
// PATCH the remote row via UpdateInbound instead — preserves the
|
||
// settings/client history and just flips the enable flag.
|
||
if inbound.NodeID != nil {
|
||
if err := rt.UpdateInbound(context.Background(), inbound, inbound); err != nil {
|
||
logger.Debug("SetInboundEnable: remote UpdateInbound on", rt.Name(), "failed:", err)
|
||
return false, err
|
||
}
|
||
return false, nil
|
||
}
|
||
|
||
if err := rt.DelInbound(context.Background(), inbound); err != nil &&
|
||
!strings.Contains(err.Error(), "not found") {
|
||
logger.Debug("SetInboundEnable: DelInbound on", rt.Name(), "failed:", err)
|
||
needRestart = true
|
||
}
|
||
if !enable {
|
||
return needRestart, nil
|
||
}
|
||
|
||
runtimeInbound, err := s.buildRuntimeInboundForAPI(db, inbound)
|
||
if err != nil {
|
||
logger.Debug("SetInboundEnable: build runtime config failed:", err)
|
||
return true, nil
|
||
}
|
||
if err := rt.AddInbound(context.Background(), runtimeInbound); err != nil {
|
||
logger.Debug("SetInboundEnable: AddInbound on", rt.Name(), "failed:", err)
|
||
needRestart = true
|
||
}
|
||
return needRestart, nil
|
||
}
|
||
|
||
func (s *InboundService) UpdateInbound(inbound *model.Inbound) (*model.Inbound, bool, error) {
|
||
// Normalize streamSettings based on protocol
|
||
s.normalizeStreamSettings(inbound)
|
||
|
||
exist, err := s.checkPortConflict(inbound, inbound.Id)
|
||
if err != nil {
|
||
return inbound, false, err
|
||
}
|
||
if exist {
|
||
return inbound, false, common.NewError("Port already exists:", inbound.Port)
|
||
}
|
||
|
||
oldInbound, err := s.GetInbound(inbound.Id)
|
||
if err != nil {
|
||
return inbound, false, err
|
||
}
|
||
|
||
tag := oldInbound.Tag
|
||
|
||
db := database.GetDB()
|
||
tx := db.Begin()
|
||
|
||
defer func() {
|
||
if err != nil {
|
||
tx.Rollback()
|
||
} else {
|
||
tx.Commit()
|
||
}
|
||
}()
|
||
|
||
err = s.updateClientTraffics(tx, oldInbound, inbound)
|
||
if err != nil {
|
||
return inbound, false, err
|
||
}
|
||
|
||
// Ensure created_at and updated_at exist in inbound.Settings clients
|
||
{
|
||
var oldSettings map[string]any
|
||
_ = json.Unmarshal([]byte(oldInbound.Settings), &oldSettings)
|
||
emailToCreated := map[string]int64{}
|
||
emailToUpdated := map[string]int64{}
|
||
if oldSettings != nil {
|
||
if oc, ok := oldSettings["clients"].([]any); ok {
|
||
for _, it := range oc {
|
||
if m, ok2 := it.(map[string]any); ok2 {
|
||
if email, ok3 := m["email"].(string); ok3 {
|
||
switch v := m["created_at"].(type) {
|
||
case float64:
|
||
emailToCreated[email] = int64(v)
|
||
case int64:
|
||
emailToCreated[email] = v
|
||
}
|
||
switch v := m["updated_at"].(type) {
|
||
case float64:
|
||
emailToUpdated[email] = int64(v)
|
||
case int64:
|
||
emailToUpdated[email] = v
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
}
|
||
var newSettings map[string]any
|
||
if err2 := json.Unmarshal([]byte(inbound.Settings), &newSettings); err2 == nil && newSettings != nil {
|
||
now := time.Now().Unix() * 1000
|
||
if nSlice, ok := newSettings["clients"].([]any); ok {
|
||
for i := range nSlice {
|
||
if m, ok2 := nSlice[i].(map[string]any); ok2 {
|
||
email, _ := m["email"].(string)
|
||
if _, ok3 := m["created_at"]; !ok3 {
|
||
if v, ok4 := emailToCreated[email]; ok4 && v > 0 {
|
||
m["created_at"] = v
|
||
} else {
|
||
m["created_at"] = now
|
||
}
|
||
}
|
||
// Preserve client's updated_at if present; do not bump on parent inbound update
|
||
if _, hasUpdated := m["updated_at"]; !hasUpdated {
|
||
if v, ok4 := emailToUpdated[email]; ok4 && v > 0 {
|
||
m["updated_at"] = v
|
||
}
|
||
}
|
||
nSlice[i] = m
|
||
}
|
||
}
|
||
newSettings["clients"] = nSlice
|
||
if bs, err3 := json.MarshalIndent(newSettings, "", " "); err3 == nil {
|
||
inbound.Settings = string(bs)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
oldInbound.Total = inbound.Total
|
||
oldInbound.Remark = inbound.Remark
|
||
oldInbound.Enable = inbound.Enable
|
||
oldInbound.ExpiryTime = inbound.ExpiryTime
|
||
oldInbound.TrafficReset = inbound.TrafficReset
|
||
oldInbound.Listen = inbound.Listen
|
||
oldInbound.Port = inbound.Port
|
||
oldInbound.Protocol = inbound.Protocol
|
||
oldInbound.Settings = inbound.Settings
|
||
oldInbound.StreamSettings = inbound.StreamSettings
|
||
oldInbound.Sniffing = inbound.Sniffing
|
||
oldInbound.Tag, err = s.resolveInboundTag(inbound, inbound.Id)
|
||
if err != nil {
|
||
return inbound, false, err
|
||
}
|
||
|
||
needRestart := false
|
||
rt, rterr := s.runtimeFor(oldInbound)
|
||
if rterr != nil {
|
||
if oldInbound.NodeID != nil {
|
||
err = rterr
|
||
return inbound, false, err
|
||
}
|
||
needRestart = true
|
||
} else {
|
||
oldSnapshot := *oldInbound
|
||
oldSnapshot.Tag = tag
|
||
if oldInbound.NodeID == nil {
|
||
if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 == nil {
|
||
logger.Debug("Old inbound deleted on", rt.Name(), ":", tag)
|
||
}
|
||
if inbound.Enable {
|
||
runtimeInbound, err2 := s.buildRuntimeInboundForAPI(tx, oldInbound)
|
||
if err2 != nil {
|
||
logger.Debug("Unable to prepare runtime inbound config:", err2)
|
||
needRestart = true
|
||
} else if err2 := rt.AddInbound(context.Background(), runtimeInbound); err2 == nil {
|
||
logger.Debug("Updated inbound added on", rt.Name(), ":", oldInbound.Tag)
|
||
} else {
|
||
logger.Debug("Unable to update inbound on", rt.Name(), ":", err2)
|
||
needRestart = true
|
||
}
|
||
}
|
||
} else {
|
||
if !inbound.Enable {
|
||
if err2 := rt.DelInbound(context.Background(), &oldSnapshot); err2 != nil {
|
||
err = err2
|
||
return inbound, false, err
|
||
}
|
||
} else if err2 := rt.UpdateInbound(context.Background(), &oldSnapshot, oldInbound); err2 != nil {
|
||
err = err2
|
||
return inbound, false, err
|
||
}
|
||
}
|
||
}
|
||
|
||
if err = tx.Save(oldInbound).Error; err != nil {
|
||
return inbound, false, err
|
||
}
|
||
newClients, gcErr := s.GetClients(oldInbound)
|
||
if gcErr != nil {
|
||
err = gcErr
|
||
return inbound, false, err
|
||
}
|
||
if err = s.clientService.SyncInbound(tx, oldInbound.Id, newClients); err != nil {
|
||
return inbound, false, err
|
||
}
|
||
return inbound, needRestart, nil
|
||
}
|
||
|
||
func (s *InboundService) buildRuntimeInboundForAPI(tx *gorm.DB, inbound *model.Inbound) (*model.Inbound, error) {
|
||
if inbound == nil {
|
||
return nil, fmt.Errorf("inbound is nil")
|
||
}
|
||
|
||
runtimeInbound := *inbound
|
||
settings := map[string]any{}
|
||
if err := json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
clients, ok := settings["clients"].([]any)
|
||
if !ok {
|
||
return &runtimeInbound, nil
|
||
}
|
||
|
||
var clientStats []xray.ClientTraffic
|
||
err := tx.Model(xray.ClientTraffic{}).
|
||
Where("inbound_id = ?", inbound.Id).
|
||
Select("email", "enable").
|
||
Find(&clientStats).Error
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
enableMap := make(map[string]bool, len(clientStats))
|
||
for _, clientTraffic := range clientStats {
|
||
enableMap[clientTraffic.Email] = clientTraffic.Enable
|
||
}
|
||
|
||
finalClients := make([]any, 0, len(clients))
|
||
for _, client := range clients {
|
||
c, ok := client.(map[string]any)
|
||
if !ok {
|
||
continue
|
||
}
|
||
|
||
email, _ := c["email"].(string)
|
||
if enable, exists := enableMap[email]; exists && !enable {
|
||
continue
|
||
}
|
||
|
||
if manualEnable, ok := c["enable"].(bool); ok && !manualEnable {
|
||
continue
|
||
}
|
||
|
||
finalClients = append(finalClients, c)
|
||
}
|
||
|
||
settings["clients"] = finalClients
|
||
modifiedSettings, err := json.MarshalIndent(settings, "", " ")
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
runtimeInbound.Settings = string(modifiedSettings)
|
||
|
||
return &runtimeInbound, nil
|
||
}
|
||
|
||
// updateClientTraffics syncs the ClientTraffic rows with the inbound's clients
|
||
// list: removes rows for emails that disappeared, inserts rows for newly-added
|
||
// emails. Uses sets for O(N) lookup — the previous nested-loop implementation
|
||
// was O(N²) and degraded into multi-second pauses on inbounds with thousands
|
||
// of clients (toggling, saving, or deleting any such inbound felt frozen).
|
||
func (s *InboundService) updateClientTraffics(tx *gorm.DB, oldInbound *model.Inbound, newInbound *model.Inbound) error {
|
||
oldClients, err := s.GetClients(oldInbound)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
newClients, err := s.GetClients(newInbound)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// Email is the unique key for ClientTraffic rows. Clients without an
|
||
// email have no stats row to sync — skip them on both sides instead of
|
||
// risking a unique-constraint hit or accidental delete of an unrelated row.
|
||
oldEmails := make(map[string]struct{}, len(oldClients))
|
||
for i := range oldClients {
|
||
if oldClients[i].Email == "" {
|
||
continue
|
||
}
|
||
oldEmails[oldClients[i].Email] = struct{}{}
|
||
}
|
||
newEmails := make(map[string]struct{}, len(newClients))
|
||
for i := range newClients {
|
||
if newClients[i].Email == "" {
|
||
continue
|
||
}
|
||
newEmails[newClients[i].Email] = struct{}{}
|
||
}
|
||
|
||
// Drop stats rows for removed emails — but not when a sibling inbound
|
||
// still references the email, since the row is the shared accumulator.
|
||
for i := range oldClients {
|
||
email := oldClients[i].Email
|
||
if email == "" {
|
||
continue
|
||
}
|
||
if _, kept := newEmails[email]; kept {
|
||
continue
|
||
}
|
||
stillUsed, err := s.emailUsedByOtherInbounds(email, oldInbound.Id)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if stillUsed {
|
||
continue
|
||
}
|
||
if err := s.DelClientStat(tx, email); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
for i := range newClients {
|
||
email := newClients[i].Email
|
||
if email == "" {
|
||
continue
|
||
}
|
||
if _, existed := oldEmails[email]; existed {
|
||
if err := s.UpdateClientStat(tx, email, &newClients[i]); err != nil {
|
||
return err
|
||
}
|
||
continue
|
||
}
|
||
if err := s.AddClientStat(tx, oldInbound.Id, &newClients[i]); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *InboundService) getClientPrimaryKey(protocol model.Protocol, client model.Client) string {
|
||
switch protocol {
|
||
case model.Trojan:
|
||
return client.Password
|
||
case model.Shadowsocks:
|
||
return client.Email
|
||
case model.Hysteria:
|
||
return client.Auth
|
||
default:
|
||
return client.ID
|
||
}
|
||
}
|
||
|
||
func (s *InboundService) writeBackClientSubID(sourceInboundID int, sourceProtocol model.Protocol, client model.Client, subID string) (bool, error) {
|
||
client.SubID = subID
|
||
client.UpdatedAt = time.Now().UnixMilli()
|
||
clientID := s.getClientPrimaryKey(sourceProtocol, client)
|
||
if clientID == "" {
|
||
return false, common.NewError("empty client ID")
|
||
}
|
||
|
||
settingsBytes, err := json.Marshal(map[string][]model.Client{
|
||
"clients": {client},
|
||
})
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
|
||
updatePayload := &model.Inbound{
|
||
Id: sourceInboundID,
|
||
Settings: string(settingsBytes),
|
||
}
|
||
return s.clientService.UpdateInboundClient(s, updatePayload, clientID)
|
||
}
|
||
|
||
func (s *InboundService) generateRandomCredential(targetProtocol model.Protocol) string {
|
||
switch targetProtocol {
|
||
case model.VMESS, model.VLESS:
|
||
return uuid.NewString()
|
||
default:
|
||
return strings.ReplaceAll(uuid.NewString(), "-", "")
|
||
}
|
||
}
|
||
|
||
func (s *InboundService) buildTargetClientFromSource(source model.Client, targetProtocol model.Protocol, email string, flow string) (model.Client, error) {
|
||
nowTs := time.Now().UnixMilli()
|
||
target := source
|
||
target.Email = email
|
||
target.CreatedAt = nowTs
|
||
target.UpdatedAt = nowTs
|
||
|
||
target.ID = ""
|
||
target.Password = ""
|
||
target.Auth = ""
|
||
target.Flow = ""
|
||
|
||
switch targetProtocol {
|
||
case model.VMESS:
|
||
target.ID = s.generateRandomCredential(targetProtocol)
|
||
case model.VLESS:
|
||
target.ID = s.generateRandomCredential(targetProtocol)
|
||
if flow == "xtls-rprx-vision" || flow == "xtls-rprx-vision-udp443" {
|
||
target.Flow = flow
|
||
}
|
||
case model.Trojan, model.Shadowsocks:
|
||
target.Password = s.generateRandomCredential(targetProtocol)
|
||
case model.Hysteria:
|
||
target.Auth = s.generateRandomCredential(targetProtocol)
|
||
default:
|
||
target.ID = s.generateRandomCredential(targetProtocol)
|
||
}
|
||
|
||
return target, nil
|
||
}
|
||
|
||
func (s *InboundService) nextAvailableCopiedEmail(originalEmail string, targetID int, occupied map[string]struct{}) string {
|
||
base := fmt.Sprintf("%s_%d", originalEmail, targetID)
|
||
candidate := base
|
||
suffix := 0
|
||
for {
|
||
if _, exists := occupied[strings.ToLower(candidate)]; !exists {
|
||
occupied[strings.ToLower(candidate)] = struct{}{}
|
||
return candidate
|
||
}
|
||
suffix++
|
||
candidate = fmt.Sprintf("%s_%d", base, suffix)
|
||
}
|
||
}
|
||
|
||
func (s *InboundService) CopyInboundClients(targetInboundID int, sourceInboundID int, clientEmails []string, flow string) (*CopyClientsResult, bool, error) {
|
||
result := &CopyClientsResult{
|
||
Added: []string{},
|
||
Skipped: []string{},
|
||
Errors: []string{},
|
||
}
|
||
if targetInboundID == sourceInboundID {
|
||
return result, false, common.NewError("source and target inbounds must be different")
|
||
}
|
||
|
||
targetInbound, err := s.GetInbound(targetInboundID)
|
||
if err != nil {
|
||
return result, false, err
|
||
}
|
||
sourceInbound, err := s.GetInbound(sourceInboundID)
|
||
if err != nil {
|
||
return result, false, err
|
||
}
|
||
|
||
sourceClients, err := s.GetClients(sourceInbound)
|
||
if err != nil {
|
||
return result, false, err
|
||
}
|
||
if len(sourceClients) == 0 {
|
||
return result, false, nil
|
||
}
|
||
|
||
allowedEmails := map[string]struct{}{}
|
||
if len(clientEmails) > 0 {
|
||
for _, email := range clientEmails {
|
||
allowedEmails[strings.ToLower(strings.TrimSpace(email))] = struct{}{}
|
||
}
|
||
}
|
||
|
||
occupiedEmails := map[string]struct{}{}
|
||
allEmails, err := s.getAllEmails()
|
||
if err != nil {
|
||
return result, false, err
|
||
}
|
||
for _, email := range allEmails {
|
||
clean := strings.Trim(email, "\"")
|
||
if clean != "" {
|
||
occupiedEmails[strings.ToLower(clean)] = struct{}{}
|
||
}
|
||
}
|
||
|
||
newClients := make([]model.Client, 0)
|
||
needRestart := false
|
||
for _, sourceClient := range sourceClients {
|
||
originalEmail := strings.TrimSpace(sourceClient.Email)
|
||
if originalEmail == "" {
|
||
continue
|
||
}
|
||
if len(allowedEmails) > 0 {
|
||
if _, ok := allowedEmails[strings.ToLower(originalEmail)]; !ok {
|
||
continue
|
||
}
|
||
}
|
||
|
||
if sourceClient.SubID == "" {
|
||
newSubID := uuid.NewString()
|
||
subNeedRestart, subErr := s.writeBackClientSubID(sourceInbound.Id, sourceInbound.Protocol, sourceClient, newSubID)
|
||
if subErr != nil {
|
||
result.Errors = append(result.Errors, fmt.Sprintf("%s: failed to write source subId: %v", originalEmail, subErr))
|
||
continue
|
||
}
|
||
if subNeedRestart {
|
||
needRestart = true
|
||
}
|
||
sourceClient.SubID = newSubID
|
||
}
|
||
|
||
targetEmail := s.nextAvailableCopiedEmail(originalEmail, targetInboundID, occupiedEmails)
|
||
targetClient, buildErr := s.buildTargetClientFromSource(sourceClient, targetInbound.Protocol, targetEmail, flow)
|
||
if buildErr != nil {
|
||
result.Errors = append(result.Errors, fmt.Sprintf("%s: %v", originalEmail, buildErr))
|
||
continue
|
||
}
|
||
newClients = append(newClients, targetClient)
|
||
result.Added = append(result.Added, targetEmail)
|
||
}
|
||
|
||
if len(newClients) == 0 {
|
||
return result, needRestart, nil
|
||
}
|
||
|
||
settingsPayload, err := json.Marshal(map[string][]model.Client{
|
||
"clients": newClients,
|
||
})
|
||
if err != nil {
|
||
return result, needRestart, err
|
||
}
|
||
|
||
addNeedRestart, err := s.clientService.AddInboundClient(s, &model.Inbound{
|
||
Id: targetInboundID,
|
||
Settings: string(settingsPayload),
|
||
})
|
||
if err != nil {
|
||
return result, needRestart, err
|
||
}
|
||
if addNeedRestart {
|
||
needRestart = true
|
||
}
|
||
|
||
return result, needRestart, nil
|
||
}
|
||
|
||
const resetGracePeriodMs int64 = 30000
|
||
|
||
// onlineGracePeriodMs must comfortably exceed the 5s traffic-poll interval —
|
||
// Xray's stats counters often report a zero delta for an active session across
|
||
// a single poll, so a 5s grace would still drop the client on the next tick.
|
||
// ~4 polls of slack keeps idle-but-connected clients visible without lingering
|
||
// long after a real disconnect.
|
||
const onlineGracePeriodMs int64 = 20000
|
||
|
||
func (s *InboundService) SetRemoteTraffic(nodeID int, snap *runtime.TrafficSnapshot) (bool, error) {
|
||
var structuralChange bool
|
||
err := submitTrafficWrite(func() error {
|
||
var inner error
|
||
structuralChange, inner = s.setRemoteTrafficLocked(nodeID, snap)
|
||
return inner
|
||
})
|
||
return structuralChange, err
|
||
}
|
||
|
||
func (s *InboundService) setRemoteTrafficLocked(nodeID int, snap *runtime.TrafficSnapshot) (bool, error) {
|
||
if snap == nil || nodeID <= 0 {
|
||
return false, nil
|
||
}
|
||
db := database.GetDB()
|
||
now := time.Now().UnixMilli()
|
||
|
||
var central []model.Inbound
|
||
if err := db.Model(model.Inbound{}).
|
||
Where("node_id = ?", nodeID).
|
||
Find(¢ral).Error; err != nil {
|
||
return false, err
|
||
}
|
||
tagToCentral := make(map[string]*model.Inbound, len(central))
|
||
for i := range central {
|
||
tagToCentral[central[i].Tag] = ¢ral[i]
|
||
}
|
||
|
||
var centralClientStats []xray.ClientTraffic
|
||
if len(central) > 0 {
|
||
ids := make([]int, 0, len(central))
|
||
for i := range central {
|
||
ids = append(ids, central[i].Id)
|
||
}
|
||
if err := db.Model(xray.ClientTraffic{}).
|
||
Where("inbound_id IN ?", ids).
|
||
Find(¢ralClientStats).Error; err != nil {
|
||
return false, err
|
||
}
|
||
}
|
||
type csKey struct {
|
||
inboundID int
|
||
email string
|
||
}
|
||
centralCS := make(map[csKey]*xray.ClientTraffic, len(centralClientStats))
|
||
centralCSByEmail := make(map[string]*xray.ClientTraffic, len(centralClientStats))
|
||
for i := range centralClientStats {
|
||
centralCS[csKey{centralClientStats[i].InboundId, centralClientStats[i].Email}] = ¢ralClientStats[i]
|
||
centralCSByEmail[centralClientStats[i].Email] = ¢ralClientStats[i]
|
||
}
|
||
|
||
var defaultUserId int
|
||
if len(central) > 0 {
|
||
defaultUserId = central[0].UserId
|
||
} else {
|
||
var u model.User
|
||
if err := db.Model(model.User{}).Order("id asc").First(&u).Error; err == nil {
|
||
defaultUserId = u.Id
|
||
} else {
|
||
defaultUserId = 1
|
||
}
|
||
}
|
||
|
||
tx := db.Begin()
|
||
committed := false
|
||
defer func() {
|
||
if !committed {
|
||
tx.Rollback()
|
||
}
|
||
}()
|
||
|
||
structuralChange := false
|
||
|
||
snapTags := make(map[string]struct{}, len(snap.Inbounds))
|
||
for _, snapIb := range snap.Inbounds {
|
||
if snapIb == nil {
|
||
continue
|
||
}
|
||
snapTags[snapIb.Tag] = struct{}{}
|
||
|
||
c, ok := tagToCentral[snapIb.Tag]
|
||
if !ok {
|
||
newIb := model.Inbound{
|
||
UserId: defaultUserId,
|
||
NodeID: &nodeID,
|
||
Tag: snapIb.Tag,
|
||
Listen: snapIb.Listen,
|
||
Port: snapIb.Port,
|
||
Protocol: snapIb.Protocol,
|
||
Settings: snapIb.Settings,
|
||
StreamSettings: snapIb.StreamSettings,
|
||
Sniffing: snapIb.Sniffing,
|
||
TrafficReset: snapIb.TrafficReset,
|
||
Enable: snapIb.Enable,
|
||
Remark: snapIb.Remark,
|
||
Total: snapIb.Total,
|
||
ExpiryTime: snapIb.ExpiryTime,
|
||
Up: snapIb.Up,
|
||
Down: snapIb.Down,
|
||
}
|
||
if err := tx.Create(&newIb).Error; err != nil {
|
||
logger.Warning("setRemoteTraffic: create central inbound for tag", snapIb.Tag, "failed:", err)
|
||
continue
|
||
}
|
||
tagToCentral[snapIb.Tag] = &newIb
|
||
structuralChange = true
|
||
continue
|
||
}
|
||
|
||
inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs
|
||
|
||
updates := map[string]any{
|
||
"enable": snapIb.Enable,
|
||
"remark": snapIb.Remark,
|
||
"listen": snapIb.Listen,
|
||
"port": snapIb.Port,
|
||
"protocol": snapIb.Protocol,
|
||
"total": snapIb.Total,
|
||
"expiry_time": snapIb.ExpiryTime,
|
||
"settings": snapIb.Settings,
|
||
"stream_settings": snapIb.StreamSettings,
|
||
"sniffing": snapIb.Sniffing,
|
||
"traffic_reset": snapIb.TrafficReset,
|
||
}
|
||
if !inGrace || (snapIb.Up+snapIb.Down) <= (c.Up+c.Down) {
|
||
updates["up"] = snapIb.Up
|
||
updates["down"] = snapIb.Down
|
||
}
|
||
|
||
if c.Settings != snapIb.Settings ||
|
||
c.Remark != snapIb.Remark ||
|
||
c.Listen != snapIb.Listen ||
|
||
c.Port != snapIb.Port ||
|
||
c.Total != snapIb.Total ||
|
||
c.ExpiryTime != snapIb.ExpiryTime ||
|
||
c.Enable != snapIb.Enable {
|
||
structuralChange = true
|
||
}
|
||
|
||
if err := tx.Model(model.Inbound{}).
|
||
Where("id = ?", c.Id).
|
||
Updates(updates).Error; err != nil {
|
||
return false, err
|
||
}
|
||
}
|
||
|
||
for _, c := range central {
|
||
if _, kept := snapTags[c.Tag]; kept {
|
||
continue
|
||
}
|
||
if err := tx.Where("inbound_id = ?", c.Id).
|
||
Delete(&xray.ClientTraffic{}).Error; err != nil {
|
||
return false, err
|
||
}
|
||
if err := tx.Where("id = ?", c.Id).
|
||
Delete(&model.Inbound{}).Error; err != nil {
|
||
return false, err
|
||
}
|
||
delete(tagToCentral, c.Tag)
|
||
structuralChange = true
|
||
}
|
||
|
||
for _, snapIb := range snap.Inbounds {
|
||
if snapIb == nil {
|
||
continue
|
||
}
|
||
c, ok := tagToCentral[snapIb.Tag]
|
||
if !ok {
|
||
continue
|
||
}
|
||
inGrace := c.LastTrafficResetTime > 0 && now-c.LastTrafficResetTime < resetGracePeriodMs
|
||
|
||
snapEmails := make(map[string]struct{}, len(snapIb.ClientStats))
|
||
for _, cs := range snapIb.ClientStats {
|
||
snapEmails[cs.Email] = struct{}{}
|
||
|
||
existing := centralCS[csKey{c.Id, cs.Email}]
|
||
if existing == nil {
|
||
existing = centralCSByEmail[cs.Email]
|
||
}
|
||
if existing == nil {
|
||
row := &xray.ClientTraffic{
|
||
InboundId: c.Id,
|
||
Email: cs.Email,
|
||
Enable: cs.Enable,
|
||
Total: cs.Total,
|
||
ExpiryTime: cs.ExpiryTime,
|
||
Reset: cs.Reset,
|
||
Up: cs.Up,
|
||
Down: cs.Down,
|
||
LastOnline: cs.LastOnline,
|
||
}
|
||
if err := tx.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "email"}}, DoNothing: true}).
|
||
Create(row).Error; err != nil {
|
||
return false, err
|
||
}
|
||
centralCS[csKey{c.Id, cs.Email}] = row
|
||
centralCSByEmail[cs.Email] = row
|
||
structuralChange = true
|
||
continue
|
||
}
|
||
|
||
if existing.Enable != cs.Enable ||
|
||
existing.Total != cs.Total ||
|
||
existing.ExpiryTime != cs.ExpiryTime ||
|
||
existing.Reset != cs.Reset {
|
||
structuralChange = true
|
||
}
|
||
|
||
if inGrace && cs.Up+cs.Down > 0 {
|
||
if err := tx.Exec(
|
||
`UPDATE client_traffics
|
||
SET enable = ?, total = ?, expiry_time = ?, reset = ?
|
||
WHERE email = ?`,
|
||
cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset, cs.Email,
|
||
).Error; err != nil {
|
||
return false, err
|
||
}
|
||
continue
|
||
}
|
||
|
||
if err := tx.Exec(
|
||
`UPDATE client_traffics
|
||
SET up = ?, down = ?, enable = ?, total = ?, expiry_time = ?, reset = ?,
|
||
last_online = MAX(last_online, ?)
|
||
WHERE email = ?`,
|
||
cs.Up, cs.Down, cs.Enable, cs.Total, cs.ExpiryTime, cs.Reset,
|
||
cs.LastOnline, cs.Email,
|
||
).Error; err != nil {
|
||
return false, err
|
||
}
|
||
}
|
||
|
||
for k, existing := range centralCS {
|
||
if k.inboundID != c.Id {
|
||
continue
|
||
}
|
||
if _, kept := snapEmails[k.email]; kept {
|
||
continue
|
||
}
|
||
if err := tx.Where("inbound_id = ? AND email = ?", c.Id, existing.Email).
|
||
Delete(&xray.ClientTraffic{}).Error; err != nil {
|
||
return false, err
|
||
}
|
||
structuralChange = true
|
||
}
|
||
}
|
||
|
||
type oldSet struct {
|
||
inboundID int
|
||
emails map[string]struct{}
|
||
}
|
||
var perInboundOld []oldSet
|
||
for _, snapIb := range snap.Inbounds {
|
||
if snapIb == nil {
|
||
continue
|
||
}
|
||
c, ok := tagToCentral[snapIb.Tag]
|
||
if !ok {
|
||
continue
|
||
}
|
||
var oldEmailsRows []string
|
||
if err := tx.Table("clients").
|
||
Joins("JOIN client_inbounds ON client_inbounds.client_id = clients.id").
|
||
Where("client_inbounds.inbound_id = ?", c.Id).
|
||
Pluck("email", &oldEmailsRows).Error; err == nil {
|
||
oldEmails := make(map[string]struct{}, len(oldEmailsRows))
|
||
for _, e := range oldEmailsRows {
|
||
if e != "" {
|
||
oldEmails[e] = struct{}{}
|
||
}
|
||
}
|
||
perInboundOld = append(perInboundOld, oldSet{inboundID: c.Id, emails: oldEmails})
|
||
}
|
||
|
||
clients, gcErr := s.GetClients(snapIb)
|
||
if gcErr != nil {
|
||
logger.Warning("setRemoteTraffic: parse clients for tag", snapIb.Tag, "failed:", gcErr)
|
||
continue
|
||
}
|
||
csEnableByEmail := make(map[string]bool, len(snapIb.ClientStats))
|
||
for _, cs := range snapIb.ClientStats {
|
||
csEnableByEmail[cs.Email] = cs.Enable
|
||
}
|
||
filtered := clients[:0]
|
||
for i := range clients {
|
||
if isClientEmailTombstoned(clients[i].Email) {
|
||
continue
|
||
}
|
||
if cse, hit := csEnableByEmail[clients[i].Email]; hit && !cse {
|
||
clients[i].Enable = false
|
||
}
|
||
filtered = append(filtered, clients[i])
|
||
}
|
||
if err := s.clientService.SyncInbound(tx, c.Id, filtered); err != nil {
|
||
logger.Warning("setRemoteTraffic: sync clients for tag", snapIb.Tag, "failed:", err)
|
||
}
|
||
}
|
||
|
||
for _, old := range perInboundOld {
|
||
var stillAttached []string
|
||
if err := tx.Table("clients").
|
||
Joins("JOIN client_inbounds ON client_inbounds.client_id = clients.id").
|
||
Where("client_inbounds.inbound_id = ?", old.inboundID).
|
||
Pluck("email", &stillAttached).Error; err != nil {
|
||
continue
|
||
}
|
||
stillSet := make(map[string]struct{}, len(stillAttached))
|
||
for _, e := range stillAttached {
|
||
stillSet[e] = struct{}{}
|
||
}
|
||
for email := range old.emails {
|
||
if _, kept := stillSet[email]; kept {
|
||
continue
|
||
}
|
||
var attachmentCount int64
|
||
if err := tx.Table("client_inbounds").
|
||
Joins("JOIN clients ON clients.id = client_inbounds.client_id").
|
||
Where("clients.email = ?", email).
|
||
Count(&attachmentCount).Error; err != nil {
|
||
continue
|
||
}
|
||
if attachmentCount > 0 {
|
||
continue
|
||
}
|
||
if err := tx.Where("email = ?", email).Delete(&model.ClientRecord{}).Error; err != nil {
|
||
logger.Warning("setRemoteTraffic: delete ClientRecord", email, "failed:", err)
|
||
}
|
||
if err := tx.Where("email = ?", email).Delete(&xray.ClientTraffic{}).Error; err != nil {
|
||
logger.Warning("setRemoteTraffic: delete ClientTraffic", email, "failed:", err)
|
||
}
|
||
structuralChange = true
|
||
}
|
||
}
|
||
|
||
if err := tx.Commit().Error; err != nil {
|
||
return false, err
|
||
}
|
||
committed = true
|
||
|
||
if p != nil {
|
||
p.SetNodeOnlineClients(nodeID, snap.OnlineEmails)
|
||
}
|
||
|
||
return structuralChange, nil
|
||
}
|
||
|
||
func (s *InboundService) AddTraffic(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (needRestart bool, clientsDisabled bool, err error) {
|
||
err = submitTrafficWrite(func() error {
|
||
var inner error
|
||
needRestart, clientsDisabled, inner = s.addTrafficLocked(inboundTraffics, clientTraffics)
|
||
return inner
|
||
})
|
||
return
|
||
}
|
||
|
||
func (s *InboundService) addTrafficLocked(inboundTraffics []*xray.Traffic, clientTraffics []*xray.ClientTraffic) (bool, bool, error) {
|
||
var err error
|
||
db := database.GetDB()
|
||
tx := db.Begin()
|
||
|
||
defer func() {
|
||
if err != nil {
|
||
tx.Rollback()
|
||
} else {
|
||
tx.Commit()
|
||
}
|
||
}()
|
||
err = s.addInboundTraffic(tx, inboundTraffics)
|
||
if err != nil {
|
||
return false, false, err
|
||
}
|
||
err = s.addClientTraffic(tx, clientTraffics)
|
||
if err != nil {
|
||
return false, false, err
|
||
}
|
||
|
||
needRestart0, count, err := s.autoRenewClients(tx)
|
||
if err != nil {
|
||
logger.Warning("Error in renew clients:", err)
|
||
} else if count > 0 {
|
||
logger.Debugf("%v clients renewed", count)
|
||
}
|
||
|
||
disabledClientsCount := int64(0)
|
||
needRestart1, count, err := s.disableInvalidClients(tx)
|
||
if err != nil {
|
||
logger.Warning("Error in disabling invalid clients:", err)
|
||
} else if count > 0 {
|
||
logger.Debugf("%v clients disabled", count)
|
||
disabledClientsCount = count
|
||
}
|
||
|
||
needRestart2, count, err := s.disableInvalidInbounds(tx)
|
||
if err != nil {
|
||
logger.Warning("Error in disabling invalid inbounds:", err)
|
||
} else if count > 0 {
|
||
logger.Debugf("%v inbounds disabled", count)
|
||
}
|
||
return needRestart0 || needRestart1 || needRestart2, disabledClientsCount > 0, nil
|
||
}
|
||
|
||
func (s *InboundService) addInboundTraffic(tx *gorm.DB, traffics []*xray.Traffic) error {
|
||
if len(traffics) == 0 {
|
||
return nil
|
||
}
|
||
|
||
var err error
|
||
|
||
for _, traffic := range traffics {
|
||
if traffic.IsInbound {
|
||
err = tx.Model(&model.Inbound{}).Where("tag = ? AND node_id IS NULL", traffic.Tag).
|
||
Updates(map[string]any{
|
||
"up": gorm.Expr("up + ?", traffic.Up),
|
||
"down": gorm.Expr("down + ?", traffic.Down),
|
||
}).Error
|
||
if err != nil {
|
||
return err
|
||
}
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *InboundService) addClientTraffic(tx *gorm.DB, traffics []*xray.ClientTraffic) (err error) {
|
||
if len(traffics) == 0 {
|
||
return nil
|
||
}
|
||
|
||
emails := make([]string, 0, len(traffics))
|
||
for _, traffic := range traffics {
|
||
emails = append(emails, traffic.Email)
|
||
}
|
||
dbClientTraffics := make([]*xray.ClientTraffic, 0, len(traffics))
|
||
err = tx.Model(xray.ClientTraffic{}).
|
||
Where("email IN (?) AND inbound_id IN (?)", emails,
|
||
tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")).
|
||
Find(&dbClientTraffics).Error
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// Avoid empty slice error
|
||
if len(dbClientTraffics) == 0 {
|
||
return nil
|
||
}
|
||
|
||
dbClientTraffics, err = s.adjustTraffics(tx, dbClientTraffics)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
// Index by email for O(N) merge — the previous nested loop was O(N²)
|
||
// and dominated each cron tick on inbounds with thousands of active
|
||
// clients (7500 × 7500 = 56M string comparisons every 10 seconds).
|
||
trafficByEmail := make(map[string]*xray.ClientTraffic, len(traffics))
|
||
for i := range traffics {
|
||
if traffics[i] != nil {
|
||
trafficByEmail[traffics[i].Email] = traffics[i]
|
||
}
|
||
}
|
||
now := time.Now().UnixMilli()
|
||
for dbTraffic_index := range dbClientTraffics {
|
||
t, ok := trafficByEmail[dbClientTraffics[dbTraffic_index].Email]
|
||
if !ok {
|
||
continue
|
||
}
|
||
dbClientTraffics[dbTraffic_index].Up += t.Up
|
||
dbClientTraffics[dbTraffic_index].Down += t.Down
|
||
if t.Up+t.Down > 0 {
|
||
dbClientTraffics[dbTraffic_index].LastOnline = now
|
||
}
|
||
}
|
||
|
||
err = tx.Save(dbClientTraffics).Error
|
||
if err != nil {
|
||
logger.Warning("AddClientTraffic update data ", err)
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (s *InboundService) adjustTraffics(tx *gorm.DB, dbClientTraffics []*xray.ClientTraffic) ([]*xray.ClientTraffic, error) {
|
||
inboundIds := make([]int, 0, len(dbClientTraffics))
|
||
for _, dbClientTraffic := range dbClientTraffics {
|
||
if dbClientTraffic.ExpiryTime < 0 {
|
||
inboundIds = append(inboundIds, dbClientTraffic.InboundId)
|
||
}
|
||
}
|
||
|
||
if len(inboundIds) > 0 {
|
||
var inbounds []*model.Inbound
|
||
err := tx.Model(model.Inbound{}).Where("id IN (?)", inboundIds).Find(&inbounds).Error
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
for inbound_index := range inbounds {
|
||
settings := map[string]any{}
|
||
json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
|
||
clients, ok := settings["clients"].([]any)
|
||
if ok {
|
||
var newClients []any
|
||
for client_index := range clients {
|
||
c := clients[client_index].(map[string]any)
|
||
for traffic_index := range dbClientTraffics {
|
||
if dbClientTraffics[traffic_index].ExpiryTime < 0 && c["email"] == dbClientTraffics[traffic_index].Email {
|
||
oldExpiryTime := c["expiryTime"].(float64)
|
||
newExpiryTime := (time.Now().Unix() * 1000) - int64(oldExpiryTime)
|
||
c["expiryTime"] = newExpiryTime
|
||
c["updated_at"] = time.Now().Unix() * 1000
|
||
dbClientTraffics[traffic_index].ExpiryTime = newExpiryTime
|
||
break
|
||
}
|
||
}
|
||
// Backfill created_at and updated_at
|
||
if _, ok := c["created_at"]; !ok {
|
||
c["created_at"] = time.Now().Unix() * 1000
|
||
}
|
||
c["updated_at"] = time.Now().Unix() * 1000
|
||
newClients = append(newClients, any(c))
|
||
}
|
||
settings["clients"] = newClients
|
||
modifiedSettings, err := json.MarshalIndent(settings, "", " ")
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
|
||
inbounds[inbound_index].Settings = string(modifiedSettings)
|
||
}
|
||
}
|
||
err = tx.Save(inbounds).Error
|
||
if err != nil {
|
||
logger.Warning("AddClientTraffic update inbounds ", err)
|
||
logger.Error(inbounds)
|
||
} else {
|
||
for _, ib := range inbounds {
|
||
if ib == nil {
|
||
continue
|
||
}
|
||
cs, gcErr := s.GetClients(ib)
|
||
if gcErr != nil {
|
||
logger.Warning("AddClientTraffic sync clients: GetClients failed", gcErr)
|
||
continue
|
||
}
|
||
if syncErr := s.clientService.SyncInbound(tx, ib.Id, cs); syncErr != nil {
|
||
logger.Warning("AddClientTraffic sync clients: SyncInbound failed", syncErr)
|
||
}
|
||
}
|
||
}
|
||
}
|
||
|
||
return dbClientTraffics, nil
|
||
}
|
||
|
||
func (s *InboundService) autoRenewClients(tx *gorm.DB) (bool, int64, error) {
|
||
// check for time expired
|
||
var traffics []*xray.ClientTraffic
|
||
now := time.Now().Unix() * 1000
|
||
var err, err1 error
|
||
|
||
err = tx.Model(xray.ClientTraffic{}).
|
||
Where("reset > 0 and expiry_time > 0 and expiry_time <= ?", now).
|
||
Where("inbound_id IN (?)", tx.Model(&model.Inbound{}).Select("id").Where("node_id IS NULL")).
|
||
Find(&traffics).Error
|
||
if err != nil {
|
||
return false, 0, err
|
||
}
|
||
// return if there is no client to renew
|
||
if len(traffics) == 0 {
|
||
return false, 0, nil
|
||
}
|
||
|
||
var inbound_ids []int
|
||
var inbounds []*model.Inbound
|
||
needRestart := false
|
||
var clientsToAdd []struct {
|
||
protocol string
|
||
tag string
|
||
client map[string]any
|
||
}
|
||
|
||
for _, traffic := range traffics {
|
||
inbound_ids = append(inbound_ids, traffic.InboundId)
|
||
}
|
||
// Dedupe so an inbound hosting N expired clients is fetched and saved once
|
||
// per tick instead of N times across chunk boundaries.
|
||
inbound_ids = uniqueInts(inbound_ids)
|
||
// Chunked to stay under SQLite's bind-variable limit when many inbounds
|
||
// are touched in a single tick.
|
||
for _, batch := range chunkInts(inbound_ids, sqliteMaxVars) {
|
||
var page []*model.Inbound
|
||
if err = tx.Model(model.Inbound{}).Where("id IN ?", batch).Find(&page).Error; err != nil {
|
||
return false, 0, err
|
||
}
|
||
inbounds = append(inbounds, page...)
|
||
}
|
||
for inbound_index := range inbounds {
|
||
settings := map[string]any{}
|
||
json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
|
||
clients := settings["clients"].([]any)
|
||
for client_index := range clients {
|
||
c := clients[client_index].(map[string]any)
|
||
for traffic_index, traffic := range traffics {
|
||
if traffic.Email == c["email"].(string) {
|
||
newExpiryTime := traffic.ExpiryTime
|
||
for newExpiryTime < now {
|
||
newExpiryTime += (int64(traffic.Reset) * 86400000)
|
||
}
|
||
c["expiryTime"] = newExpiryTime
|
||
traffics[traffic_index].ExpiryTime = newExpiryTime
|
||
traffics[traffic_index].Down = 0
|
||
traffics[traffic_index].Up = 0
|
||
if !traffic.Enable {
|
||
traffics[traffic_index].Enable = true
|
||
c["enable"] = true
|
||
clientsToAdd = append(clientsToAdd,
|
||
struct {
|
||
protocol string
|
||
tag string
|
||
client map[string]any
|
||
}{
|
||
protocol: string(inbounds[inbound_index].Protocol),
|
||
tag: inbounds[inbound_index].Tag,
|
||
client: c,
|
||
})
|
||
}
|
||
clients[client_index] = any(c)
|
||
break
|
||
}
|
||
}
|
||
}
|
||
settings["clients"] = clients
|
||
newSettings, err := json.MarshalIndent(settings, "", " ")
|
||
if err != nil {
|
||
return false, 0, err
|
||
}
|
||
inbounds[inbound_index].Settings = string(newSettings)
|
||
}
|
||
err = tx.Save(inbounds).Error
|
||
if err != nil {
|
||
return false, 0, err
|
||
}
|
||
for _, ib := range inbounds {
|
||
if ib == nil {
|
||
continue
|
||
}
|
||
cs, gcErr := s.GetClients(ib)
|
||
if gcErr != nil {
|
||
logger.Warning("autoRenewClients sync clients: GetClients failed", gcErr)
|
||
continue
|
||
}
|
||
if syncErr := s.clientService.SyncInbound(tx, ib.Id, cs); syncErr != nil {
|
||
logger.Warning("autoRenewClients sync clients: SyncInbound failed", syncErr)
|
||
}
|
||
}
|
||
err = tx.Save(traffics).Error
|
||
if err != nil {
|
||
return false, 0, err
|
||
}
|
||
if p != nil {
|
||
err1 = s.xrayApi.Init(p.GetAPIPort())
|
||
if err1 != nil {
|
||
return true, int64(len(traffics)), nil
|
||
}
|
||
for _, clientToAdd := range clientsToAdd {
|
||
err1 = s.xrayApi.AddUser(clientToAdd.protocol, clientToAdd.tag, clientToAdd.client)
|
||
if err1 != nil {
|
||
needRestart = true
|
||
}
|
||
}
|
||
s.xrayApi.Close()
|
||
}
|
||
return needRestart, int64(len(traffics)), nil
|
||
}
|
||
|
||
func (s *InboundService) disableInvalidInbounds(tx *gorm.DB) (bool, int64, error) {
|
||
now := time.Now().Unix() * 1000
|
||
needRestart := false
|
||
|
||
if p != nil {
|
||
var tags []string
|
||
err := tx.Table("inbounds").
|
||
Select("inbounds.tag").
|
||
Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ? and node_id IS NULL", now, true).
|
||
Scan(&tags).Error
|
||
if err != nil {
|
||
return false, 0, err
|
||
}
|
||
s.xrayApi.Init(p.GetAPIPort())
|
||
for _, tag := range tags {
|
||
err1 := s.xrayApi.DelInbound(tag)
|
||
if err1 == nil {
|
||
logger.Debug("Inbound disabled by api:", tag)
|
||
} else {
|
||
logger.Debug("Error in disabling inbound by api:", err1)
|
||
needRestart = true
|
||
}
|
||
}
|
||
s.xrayApi.Close()
|
||
}
|
||
|
||
result := tx.Model(model.Inbound{}).
|
||
Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ? and node_id IS NULL", now, true).
|
||
Update("enable", false)
|
||
err := result.Error
|
||
count := result.RowsAffected
|
||
return needRestart, count, err
|
||
}
|
||
|
||
func (s *InboundService) disableInvalidClients(tx *gorm.DB) (bool, int64, error) {
|
||
now := time.Now().Unix() * 1000
|
||
needRestart := false
|
||
|
||
var depletedRows []xray.ClientTraffic
|
||
err := tx.Model(xray.ClientTraffic{}).
|
||
Where("((total > 0 AND up + down >= total) OR (expiry_time > 0 AND expiry_time <= ?)) AND enable = ?", now, true).
|
||
Find(&depletedRows).Error
|
||
if err != nil {
|
||
return false, 0, err
|
||
}
|
||
if len(depletedRows) == 0 {
|
||
return false, 0, nil
|
||
}
|
||
|
||
depletedEmails := make([]string, 0, len(depletedRows))
|
||
for i := range depletedRows {
|
||
if depletedRows[i].Email == "" {
|
||
continue
|
||
}
|
||
depletedEmails = append(depletedEmails, depletedRows[i].Email)
|
||
}
|
||
|
||
type target struct {
|
||
InboundID int `gorm:"column:inbound_id"`
|
||
NodeID *int `gorm:"column:node_id"`
|
||
Tag string
|
||
Email string
|
||
}
|
||
var targets []target
|
||
if len(depletedEmails) > 0 {
|
||
err = tx.Raw(`
|
||
SELECT inbounds.id AS inbound_id, inbounds.node_id AS node_id,
|
||
inbounds.tag AS tag, clients.email AS email
|
||
FROM clients
|
||
JOIN client_inbounds ON client_inbounds.client_id = clients.id
|
||
JOIN inbounds ON inbounds.id = client_inbounds.inbound_id
|
||
WHERE clients.email IN ?
|
||
`, depletedEmails).Scan(&targets).Error
|
||
if err != nil {
|
||
return false, 0, err
|
||
}
|
||
}
|
||
|
||
var localTargets []target
|
||
localByInbound := make(map[int]map[string]struct{})
|
||
remoteByInbound := make(map[int][]target)
|
||
for _, t := range targets {
|
||
if t.NodeID == nil {
|
||
localTargets = append(localTargets, t)
|
||
if localByInbound[t.InboundID] == nil {
|
||
localByInbound[t.InboundID] = make(map[string]struct{})
|
||
}
|
||
localByInbound[t.InboundID][t.Email] = struct{}{}
|
||
} else {
|
||
remoteByInbound[t.InboundID] = append(remoteByInbound[t.InboundID], t)
|
||
}
|
||
}
|
||
|
||
if p != nil && len(localTargets) > 0 {
|
||
s.xrayApi.Init(p.GetAPIPort())
|
||
for _, t := range localTargets {
|
||
err1 := s.xrayApi.RemoveUser(t.Tag, t.Email)
|
||
if err1 == nil {
|
||
logger.Debug("Client disabled by api:", t.Email)
|
||
} else if strings.Contains(err1.Error(), fmt.Sprintf("User %s not found.", t.Email)) {
|
||
logger.Debug("User is already disabled. Nothing to do more...")
|
||
} else {
|
||
logger.Debug("Error in disabling client by api:", err1)
|
||
needRestart = true
|
||
}
|
||
}
|
||
s.xrayApi.Close()
|
||
}
|
||
|
||
for inboundID, emails := range localByInbound {
|
||
if _, _, mErr := s.markClientsDisabledInSettings(tx, inboundID, emails); mErr != nil {
|
||
logger.Warning("disableInvalidClients: settings.JSON sync failed for inbound", inboundID, ":", mErr)
|
||
}
|
||
}
|
||
|
||
result := tx.Model(xray.ClientTraffic{}).
|
||
Where("((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?)) and enable = ?", now, true).
|
||
Update("enable", false)
|
||
err = result.Error
|
||
count := result.RowsAffected
|
||
if err != nil {
|
||
return needRestart, count, err
|
||
}
|
||
|
||
if len(depletedEmails) > 0 {
|
||
if err := tx.Model(&model.ClientRecord{}).
|
||
Where("email IN ?", depletedEmails).
|
||
Updates(map[string]any{"enable": false, "updated_at": now}).Error; err != nil {
|
||
logger.Warning("disableInvalidClients update clients.enable:", err)
|
||
}
|
||
}
|
||
|
||
for inboundID, group := range remoteByInbound {
|
||
emails := make(map[string]struct{}, len(group))
|
||
for _, t := range group {
|
||
emails[t.Email] = struct{}{}
|
||
}
|
||
if pushErr := s.disableRemoteClients(tx, inboundID, emails); pushErr != nil {
|
||
logger.Warning("disableInvalidClients: push to remote failed for inbound", inboundID, ":", pushErr)
|
||
needRestart = true
|
||
}
|
||
}
|
||
|
||
return needRestart, count, nil
|
||
}
|
||
|
||
// markClientsDisabledInSettings flips client.enable=false in the inbound's
|
||
// stored settings JSON for the given emails and returns both the pre and
|
||
// post snapshots so a caller pushing to a remote node has the diff to hand.
|
||
func (s *InboundService) markClientsDisabledInSettings(tx *gorm.DB, inboundID int, emails map[string]struct{}) (oldIb, newIb *model.Inbound, err error) {
|
||
var ib model.Inbound
|
||
if err := tx.Model(&model.Inbound{}).Where("id = ?", inboundID).First(&ib).Error; err != nil {
|
||
return nil, nil, err
|
||
}
|
||
snapshot := ib
|
||
|
||
settings := map[string]any{}
|
||
if err := json.Unmarshal([]byte(ib.Settings), &settings); err != nil {
|
||
return nil, nil, err
|
||
}
|
||
clients, _ := settings["clients"].([]any)
|
||
now := time.Now().Unix() * 1000
|
||
mutated := false
|
||
for i := range clients {
|
||
entry, ok := clients[i].(map[string]any)
|
||
if !ok {
|
||
continue
|
||
}
|
||
email, _ := entry["email"].(string)
|
||
if _, hit := emails[email]; !hit {
|
||
continue
|
||
}
|
||
if cur, _ := entry["enable"].(bool); cur == false {
|
||
continue
|
||
}
|
||
entry["enable"] = false
|
||
entry["updated_at"] = now
|
||
clients[i] = entry
|
||
mutated = true
|
||
}
|
||
if !mutated {
|
||
return &snapshot, &ib, nil
|
||
}
|
||
settings["clients"] = clients
|
||
bs, marshalErr := json.MarshalIndent(settings, "", " ")
|
||
if marshalErr != nil {
|
||
return nil, nil, marshalErr
|
||
}
|
||
ib.Settings = string(bs)
|
||
if err := tx.Model(&model.Inbound{}).Where("id = ?", inboundID).
|
||
Update("settings", ib.Settings).Error; err != nil {
|
||
return nil, nil, err
|
||
}
|
||
return &snapshot, &ib, nil
|
||
}
|
||
|
||
func (s *InboundService) disableRemoteClients(tx *gorm.DB, inboundID int, emails map[string]struct{}) error {
|
||
oldSnapshot, ib, err := s.markClientsDisabledInSettings(tx, inboundID, emails)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
|
||
rt, err := s.runtimeFor(ib)
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if err := rt.UpdateInbound(context.Background(), oldSnapshot, ib); err != nil {
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *InboundService) GetInboundTags() (string, error) {
|
||
db := database.GetDB()
|
||
var inboundTags []string
|
||
err := db.Model(model.Inbound{}).Select("tag").Find(&inboundTags).Error
|
||
if err != nil && err != gorm.ErrRecordNotFound {
|
||
return "", err
|
||
}
|
||
tags, _ := json.Marshal(inboundTags)
|
||
return string(tags), nil
|
||
}
|
||
|
||
func (s *InboundService) GetClientReverseTags() (string, error) {
|
||
db := database.GetDB()
|
||
var inbounds []model.Inbound
|
||
err := db.Model(model.Inbound{}).Select("settings").Where("protocol = ?", "vless").Find(&inbounds).Error
|
||
if err != nil && err != gorm.ErrRecordNotFound {
|
||
return "[]", err
|
||
}
|
||
|
||
tagSet := make(map[string]struct{})
|
||
for _, inbound := range inbounds {
|
||
var settings map[string]any
|
||
if err := json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
|
||
continue
|
||
}
|
||
clients, ok := settings["clients"].([]any)
|
||
if !ok {
|
||
continue
|
||
}
|
||
for _, client := range clients {
|
||
clientMap, ok := client.(map[string]any)
|
||
if !ok {
|
||
continue
|
||
}
|
||
reverse, ok := clientMap["reverse"].(map[string]any)
|
||
if !ok {
|
||
continue
|
||
}
|
||
tag, _ := reverse["tag"].(string)
|
||
tag = strings.TrimSpace(tag)
|
||
if tag != "" {
|
||
tagSet[tag] = struct{}{}
|
||
}
|
||
}
|
||
}
|
||
|
||
rawTags := make([]string, 0, len(tagSet))
|
||
for tag := range tagSet {
|
||
rawTags = append(rawTags, tag)
|
||
}
|
||
sort.Strings(rawTags)
|
||
|
||
result, _ := json.Marshal(rawTags)
|
||
return string(result), nil
|
||
}
|
||
|
||
func (s *InboundService) MigrationRemoveOrphanedTraffics() {
|
||
db := database.GetDB()
|
||
query := fmt.Sprintf(
|
||
"DELETE FROM client_traffics WHERE email NOT IN (SELECT %s %s)",
|
||
database.JSONFieldText("client.value", "email"),
|
||
database.JSONClientsFromInbound(),
|
||
)
|
||
db.Exec(query)
|
||
}
|
||
|
||
// AddClientStat inserts a per-client accounting row, no-op on email
|
||
// conflict. Xray reports traffic per email, so the surviving row acts as
|
||
// the shared accumulator for inbounds that re-use the same identity.
|
||
func (s *InboundService) AddClientStat(tx *gorm.DB, inboundId int, client *model.Client) error {
|
||
clientTraffic := xray.ClientTraffic{
|
||
InboundId: inboundId,
|
||
Email: client.Email,
|
||
Total: client.TotalGB,
|
||
ExpiryTime: client.ExpiryTime,
|
||
Enable: client.Enable,
|
||
Reset: client.Reset,
|
||
}
|
||
return tx.Clauses(clause.OnConflict{Columns: []clause.Column{{Name: "email"}}, DoNothing: true}).
|
||
Create(&clientTraffic).Error
|
||
}
|
||
|
||
func (s *InboundService) UpdateClientStat(tx *gorm.DB, email string, client *model.Client) error {
|
||
result := tx.Model(xray.ClientTraffic{}).
|
||
Where("email = ?", email).
|
||
Updates(map[string]any{
|
||
"enable": client.Enable,
|
||
"email": client.Email,
|
||
"total": client.TotalGB,
|
||
"expiry_time": client.ExpiryTime,
|
||
"reset": client.Reset,
|
||
})
|
||
err := result.Error
|
||
return err
|
||
}
|
||
|
||
func (s *InboundService) UpdateClientIPs(tx *gorm.DB, oldEmail string, newEmail string) error {
|
||
return tx.Model(model.InboundClientIps{}).Where("client_email = ?", oldEmail).Update("client_email", newEmail).Error
|
||
}
|
||
|
||
func (s *InboundService) DelClientStat(tx *gorm.DB, email string) error {
|
||
return tx.Where("email = ?", email).Delete(xray.ClientTraffic{}).Error
|
||
}
|
||
|
||
func (s *InboundService) DelClientIPs(tx *gorm.DB, email string) error {
|
||
return tx.Where("client_email = ?", email).Delete(model.InboundClientIps{}).Error
|
||
}
|
||
|
||
func (s *InboundService) GetClientInboundByTrafficID(trafficId int) (traffic *xray.ClientTraffic, inbound *model.Inbound, err error) {
|
||
db := database.GetDB()
|
||
var traffics []*xray.ClientTraffic
|
||
err = db.Model(xray.ClientTraffic{}).Where("id = ?", trafficId).Find(&traffics).Error
|
||
if err != nil {
|
||
logger.Warningf("Error retrieving ClientTraffic with trafficId %d: %v", trafficId, err)
|
||
return nil, nil, err
|
||
}
|
||
if len(traffics) > 0 {
|
||
inbound, err = s.GetInbound(traffics[0].InboundId)
|
||
return traffics[0], inbound, err
|
||
}
|
||
return nil, nil, nil
|
||
}
|
||
|
||
func (s *InboundService) GetClientInboundByEmail(email string) (traffic *xray.ClientTraffic, inbound *model.Inbound, err error) {
|
||
db := database.GetDB()
|
||
var traffics []*xray.ClientTraffic
|
||
err = db.Model(xray.ClientTraffic{}).Where("email = ?", email).Find(&traffics).Error
|
||
if err != nil {
|
||
logger.Warningf("Error retrieving ClientTraffic with email %s: %v", email, err)
|
||
return nil, nil, err
|
||
}
|
||
if len(traffics) > 0 {
|
||
inbound, err = s.GetInbound(traffics[0].InboundId)
|
||
return traffics[0], inbound, err
|
||
}
|
||
return nil, nil, nil
|
||
}
|
||
|
||
func (s *InboundService) GetClientByEmail(clientEmail string) (*xray.ClientTraffic, *model.Client, error) {
|
||
traffic, inbound, err := s.GetClientInboundByEmail(clientEmail)
|
||
if err != nil {
|
||
return nil, nil, err
|
||
}
|
||
if inbound == nil {
|
||
return nil, nil, common.NewError("Inbound Not Found For Email:", clientEmail)
|
||
}
|
||
|
||
clients, err := s.GetClients(inbound)
|
||
if err != nil {
|
||
return nil, nil, err
|
||
}
|
||
|
||
for _, client := range clients {
|
||
if client.Email == clientEmail {
|
||
return traffic, &client, nil
|
||
}
|
||
}
|
||
|
||
return nil, nil, common.NewError("Client Not Found In Inbound For Email:", clientEmail)
|
||
}
|
||
|
||
func (s *InboundService) ResetClientTrafficByEmail(clientEmail string) error {
|
||
return submitTrafficWrite(func() error {
|
||
db := database.GetDB()
|
||
return db.Model(xray.ClientTraffic{}).
|
||
Where("email = ?", clientEmail).
|
||
Updates(map[string]any{"enable": true, "up": 0, "down": 0}).Error
|
||
})
|
||
}
|
||
|
||
func (s *InboundService) ResetClientTraffic(id int, clientEmail string) (needRestart bool, err error) {
|
||
err = submitTrafficWrite(func() error {
|
||
var inner error
|
||
needRestart, inner = s.resetClientTrafficLocked(id, clientEmail)
|
||
return inner
|
||
})
|
||
return
|
||
}
|
||
|
||
func (s *InboundService) resetClientTrafficLocked(id int, clientEmail string) (bool, error) {
|
||
needRestart := false
|
||
|
||
traffic, err := s.GetClientTrafficByEmail(clientEmail)
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
|
||
if !traffic.Enable {
|
||
inbound, err := s.GetInbound(id)
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
clients, err := s.GetClients(inbound)
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
for _, client := range clients {
|
||
if client.Email == clientEmail && client.Enable {
|
||
rt, rterr := s.runtimeFor(inbound)
|
||
if rterr != nil {
|
||
if inbound.NodeID != nil {
|
||
return false, rterr
|
||
}
|
||
needRestart = true
|
||
break
|
||
}
|
||
cipher := ""
|
||
if string(inbound.Protocol) == "shadowsocks" {
|
||
var oldSettings map[string]any
|
||
err = json.Unmarshal([]byte(inbound.Settings), &oldSettings)
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
cipher = oldSettings["method"].(string)
|
||
}
|
||
err1 := rt.AddUser(context.Background(), inbound, map[string]any{
|
||
"email": client.Email,
|
||
"id": client.ID,
|
||
"auth": client.Auth,
|
||
"security": client.Security,
|
||
"flow": client.Flow,
|
||
"password": client.Password,
|
||
"cipher": cipher,
|
||
})
|
||
if err1 == nil {
|
||
logger.Debug("Client enabled on", rt.Name(), "due to reset traffic:", clientEmail)
|
||
} else {
|
||
logger.Debug("Error in enabling client on", rt.Name(), ":", err1)
|
||
needRestart = true
|
||
}
|
||
break
|
||
}
|
||
}
|
||
}
|
||
|
||
traffic.Up = 0
|
||
traffic.Down = 0
|
||
traffic.Enable = true
|
||
|
||
db := database.GetDB()
|
||
err = db.Save(traffic).Error
|
||
if err != nil {
|
||
return false, err
|
||
}
|
||
|
||
now := time.Now().UnixMilli()
|
||
_ = db.Model(model.Inbound{}).
|
||
Where("id = ?", id).
|
||
Update("last_traffic_reset_time", now).Error
|
||
|
||
inbound, err := s.GetInbound(id)
|
||
if err == nil && inbound != nil && inbound.NodeID != nil {
|
||
if rt, rterr := s.runtimeFor(inbound); rterr == nil {
|
||
if e := rt.ResetClientTraffic(context.Background(), inbound, clientEmail); e != nil {
|
||
logger.Warning("ResetClientTraffic: remote propagation to", rt.Name(), "failed:", e)
|
||
}
|
||
} else {
|
||
logger.Warning("ResetClientTraffic: runtime lookup failed:", rterr)
|
||
}
|
||
}
|
||
|
||
return needRestart, nil
|
||
}
|
||
|
||
func (s *InboundService) ResetAllTraffics() error {
|
||
return submitTrafficWrite(func() error {
|
||
return s.resetAllTrafficsLocked()
|
||
})
|
||
}
|
||
|
||
func (s *InboundService) resetAllTrafficsLocked() error {
|
||
db := database.GetDB()
|
||
now := time.Now().UnixMilli()
|
||
|
||
if err := db.Model(model.Inbound{}).
|
||
Where("user_id > ?", 0).
|
||
Updates(map[string]any{
|
||
"up": 0,
|
||
"down": 0,
|
||
"last_traffic_reset_time": now,
|
||
}).Error; err != nil {
|
||
return err
|
||
}
|
||
|
||
return nil
|
||
}
|
||
|
||
func (s *InboundService) ResetInboundTraffic(id int) error {
|
||
return submitTrafficWrite(func() error {
|
||
db := database.GetDB()
|
||
return db.Model(model.Inbound{}).
|
||
Where("id = ?", id).
|
||
Updates(map[string]any{"up": 0, "down": 0}).Error
|
||
})
|
||
}
|
||
|
||
func (s *InboundService) DelDepletedClients(id int) (err error) {
|
||
db := database.GetDB()
|
||
tx := db.Begin()
|
||
defer func() {
|
||
if err == nil {
|
||
tx.Commit()
|
||
} else {
|
||
tx.Rollback()
|
||
}
|
||
}()
|
||
|
||
// Collect depleted emails globally — a shared-email row owned by one
|
||
// inbound depletes every sibling that lists the email.
|
||
now := time.Now().Unix() * 1000
|
||
depletedClause := "reset = 0 and ((total > 0 and up + down >= total) or (expiry_time > 0 and expiry_time <= ?))"
|
||
var depletedRows []xray.ClientTraffic
|
||
err = db.Model(xray.ClientTraffic{}).
|
||
Where(depletedClause, now).
|
||
Find(&depletedRows).Error
|
||
if err != nil {
|
||
return err
|
||
}
|
||
if len(depletedRows) == 0 {
|
||
return nil
|
||
}
|
||
|
||
depletedEmails := make(map[string]struct{}, len(depletedRows))
|
||
for _, r := range depletedRows {
|
||
if r.Email == "" {
|
||
continue
|
||
}
|
||
depletedEmails[strings.ToLower(r.Email)] = struct{}{}
|
||
}
|
||
if len(depletedEmails) == 0 {
|
||
return nil
|
||
}
|
||
|
||
var inbounds []*model.Inbound
|
||
inboundQuery := db.Model(model.Inbound{})
|
||
if id >= 0 {
|
||
inboundQuery = inboundQuery.Where("id = ?", id)
|
||
}
|
||
if err = inboundQuery.Find(&inbounds).Error; err != nil {
|
||
return err
|
||
}
|
||
|
||
for _, inbound := range inbounds {
|
||
var settings map[string]any
|
||
if err = json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
|
||
return err
|
||
}
|
||
rawClients, ok := settings["clients"].([]any)
|
||
if !ok {
|
||
continue
|
||
}
|
||
newClients := make([]any, 0, len(rawClients))
|
||
removed := 0
|
||
for _, client := range rawClients {
|
||
c, ok := client.(map[string]any)
|
||
if !ok {
|
||
newClients = append(newClients, client)
|
||
continue
|
||
}
|
||
email, _ := c["email"].(string)
|
||
if _, isDepleted := depletedEmails[strings.ToLower(email)]; isDepleted {
|
||
removed++
|
||
continue
|
||
}
|
||
newClients = append(newClients, client)
|
||
}
|
||
if removed == 0 {
|
||
continue
|
||
}
|
||
if len(newClients) == 0 {
|
||
s.DelInbound(inbound.Id)
|
||
continue
|
||
}
|
||
settings["clients"] = newClients
|
||
ns, mErr := json.MarshalIndent(settings, "", " ")
|
||
if mErr != nil {
|
||
return mErr
|
||
}
|
||
inbound.Settings = string(ns)
|
||
if err = tx.Save(inbound).Error; err != nil {
|
||
return err
|
||
}
|
||
survivingClients, gcErr := s.GetClients(inbound)
|
||
if gcErr != nil {
|
||
err = gcErr
|
||
return err
|
||
}
|
||
if err = s.clientService.SyncInbound(tx, inbound.Id, survivingClients); err != nil {
|
||
return err
|
||
}
|
||
}
|
||
|
||
// Drop now-orphaned rows. With id >= 0, a row is safe to drop only when
|
||
// no out-of-scope inbound still references the email.
|
||
if id < 0 {
|
||
err = tx.Where(depletedClause, now).Delete(xray.ClientTraffic{}).Error
|
||
return err
|
||
}
|
||
emails := make([]string, 0, len(depletedEmails))
|
||
for e := range depletedEmails {
|
||
emails = append(emails, e)
|
||
}
|
||
var stillReferenced []string
|
||
emailExpr := database.JSONFieldText("client.value", "email")
|
||
stillQuery := fmt.Sprintf(
|
||
"SELECT DISTINCT LOWER(%s) %s WHERE LOWER(%s) IN ?",
|
||
emailExpr,
|
||
database.JSONClientsFromInbound(),
|
||
emailExpr,
|
||
)
|
||
if err = tx.Raw(stillQuery, emails).Scan(&stillReferenced).Error; err != nil {
|
||
return err
|
||
}
|
||
stillSet := make(map[string]struct{}, len(stillReferenced))
|
||
for _, e := range stillReferenced {
|
||
stillSet[e] = struct{}{}
|
||
}
|
||
toDelete := make([]string, 0, len(emails))
|
||
for _, e := range emails {
|
||
if _, kept := stillSet[e]; !kept {
|
||
toDelete = append(toDelete, e)
|
||
}
|
||
}
|
||
if len(toDelete) > 0 {
|
||
if err = tx.Where("LOWER(email) IN ?", toDelete).Delete(xray.ClientTraffic{}).Error; err != nil {
|
||
return err
|
||
}
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *InboundService) GetClientTrafficTgBot(tgId int64) ([]*xray.ClientTraffic, error) {
|
||
db := database.GetDB()
|
||
var inbounds []*model.Inbound
|
||
|
||
// Retrieve inbounds where settings contain the given tgId
|
||
err := db.Model(model.Inbound{}).Where("settings LIKE ?", fmt.Sprintf(`%%"tgId": %d%%`, tgId)).Find(&inbounds).Error
|
||
if err != nil && err != gorm.ErrRecordNotFound {
|
||
logger.Errorf("Error retrieving inbounds with tgId %d: %v", tgId, err)
|
||
return nil, err
|
||
}
|
||
|
||
var emails []string
|
||
for _, inbound := range inbounds {
|
||
clients, err := s.GetClients(inbound)
|
||
if err != nil {
|
||
logger.Errorf("Error retrieving clients for inbound %d: %v", inbound.Id, err)
|
||
continue
|
||
}
|
||
for _, client := range clients {
|
||
if client.TgID == tgId {
|
||
emails = append(emails, client.Email)
|
||
}
|
||
}
|
||
}
|
||
|
||
// Chunked to stay under SQLite's bind-variable limit when a single Telegram
|
||
// account owns thousands of clients across inbounds.
|
||
uniqEmails := uniqueNonEmptyStrings(emails)
|
||
traffics := make([]*xray.ClientTraffic, 0, len(uniqEmails))
|
||
for _, batch := range chunkStrings(uniqEmails, sqliteMaxVars) {
|
||
var page []*xray.ClientTraffic
|
||
if err = db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil {
|
||
if err == gorm.ErrRecordNotFound {
|
||
continue
|
||
}
|
||
logger.Errorf("Error retrieving ClientTraffic for emails %v: %v", batch, err)
|
||
return nil, err
|
||
}
|
||
traffics = append(traffics, page...)
|
||
}
|
||
if len(traffics) == 0 {
|
||
logger.Warning("No ClientTraffic records found for emails:", emails)
|
||
return nil, nil
|
||
}
|
||
|
||
// Populate UUID and other client data for each traffic record
|
||
for i := range traffics {
|
||
if ct, client, e := s.GetClientByEmail(traffics[i].Email); e == nil && ct != nil && client != nil {
|
||
traffics[i].Enable = client.Enable
|
||
traffics[i].UUID = client.ID
|
||
traffics[i].SubId = client.SubID
|
||
}
|
||
}
|
||
|
||
return traffics, nil
|
||
}
|
||
|
||
// sqliteMaxVars is a safe ceiling for the number of bind parameters in a
|
||
// single SQL statement. SQLite's SQLITE_MAX_VARIABLE_NUMBER is 999 on builds
|
||
// before 3.32 and 32766 after; staying under 999 keeps queries portable
|
||
// across forks/old binaries and also bounds per-query memory on truly large
|
||
// installs (>32k clients) where even modern SQLite would refuse a single IN.
|
||
const sqliteMaxVars = 900
|
||
|
||
// uniqueNonEmptyStrings returns a deduplicated copy of in with empty strings
|
||
// removed, preserving the order of first occurrence.
|
||
func uniqueNonEmptyStrings(in []string) []string {
|
||
if len(in) == 0 {
|
||
return nil
|
||
}
|
||
seen := make(map[string]struct{}, len(in))
|
||
out := make([]string, 0, len(in))
|
||
for _, v := range in {
|
||
if v == "" {
|
||
continue
|
||
}
|
||
if _, ok := seen[v]; ok {
|
||
continue
|
||
}
|
||
seen[v] = struct{}{}
|
||
out = append(out, v)
|
||
}
|
||
return out
|
||
}
|
||
|
||
// uniqueInts returns a deduplicated copy of in, preserving order of first occurrence.
|
||
func uniqueInts(in []int) []int {
|
||
if len(in) == 0 {
|
||
return nil
|
||
}
|
||
seen := make(map[int]struct{}, len(in))
|
||
out := make([]int, 0, len(in))
|
||
for _, v := range in {
|
||
if _, ok := seen[v]; ok {
|
||
continue
|
||
}
|
||
seen[v] = struct{}{}
|
||
out = append(out, v)
|
||
}
|
||
return out
|
||
}
|
||
|
||
// chunkStrings splits s into consecutive sub-slices of at most size elements.
|
||
// Returns nil for an empty input or non-positive size.
|
||
func chunkStrings(s []string, size int) [][]string {
|
||
if size <= 0 || len(s) == 0 {
|
||
return nil
|
||
}
|
||
out := make([][]string, 0, (len(s)+size-1)/size)
|
||
for i := 0; i < len(s); i += size {
|
||
end := min(i+size, len(s))
|
||
out = append(out, s[i:end])
|
||
}
|
||
return out
|
||
}
|
||
|
||
// chunkInts splits s into consecutive sub-slices of at most size elements.
|
||
// Returns nil for an empty input or non-positive size.
|
||
func chunkInts(s []int, size int) [][]int {
|
||
if size <= 0 || len(s) == 0 {
|
||
return nil
|
||
}
|
||
out := make([][]int, 0, (len(s)+size-1)/size)
|
||
for i := 0; i < len(s); i += size {
|
||
end := min(i+size, len(s))
|
||
out = append(out, s[i:end])
|
||
}
|
||
return out
|
||
}
|
||
|
||
func (s *InboundService) GetActiveClientTraffics(emails []string) ([]*xray.ClientTraffic, error) {
|
||
uniq := uniqueNonEmptyStrings(emails)
|
||
if len(uniq) == 0 {
|
||
return nil, nil
|
||
}
|
||
db := database.GetDB()
|
||
traffics := make([]*xray.ClientTraffic, 0, len(uniq))
|
||
for _, batch := range chunkStrings(uniq, sqliteMaxVars) {
|
||
var page []*xray.ClientTraffic
|
||
if err := db.Model(xray.ClientTraffic{}).Where("email IN ?", batch).Find(&page).Error; err != nil {
|
||
return nil, err
|
||
}
|
||
traffics = append(traffics, page...)
|
||
}
|
||
return traffics, nil
|
||
}
|
||
|
||
// GetAllClientTraffics returns the full set of client_traffics rows so the
|
||
// websocket broadcasters can ship a complete snapshot every cycle. The old
|
||
// delta-only path (GetActiveClientTraffics on activeEmails) silently dropped
|
||
// the per-client section whenever no client moved bytes in the cycle or a
|
||
// node sync failed, leaving client rows in the UI stuck at stale numbers.
|
||
func (s *InboundService) GetAllClientTraffics() ([]*xray.ClientTraffic, error) {
|
||
db := database.GetDB()
|
||
var traffics []*xray.ClientTraffic
|
||
if err := db.Model(xray.ClientTraffic{}).Find(&traffics).Error; err != nil {
|
||
return nil, err
|
||
}
|
||
return traffics, nil
|
||
}
|
||
|
||
type InboundTrafficSummary struct {
|
||
Id int `json:"id"`
|
||
Up int64 `json:"up"`
|
||
Down int64 `json:"down"`
|
||
Total int64 `json:"total"`
|
||
Enable bool `json:"enable"`
|
||
}
|
||
|
||
func (s *InboundService) GetInboundsTrafficSummary() ([]InboundTrafficSummary, error) {
|
||
db := database.GetDB()
|
||
var summaries []InboundTrafficSummary
|
||
if err := db.Model(&model.Inbound{}).
|
||
Select("id, up, down, total, enable").
|
||
Find(&summaries).Error; err != nil {
|
||
return nil, err
|
||
}
|
||
return summaries, nil
|
||
}
|
||
|
||
func (s *InboundService) GetClientTrafficByEmail(email string) (traffic *xray.ClientTraffic, err error) {
|
||
// Prefer retrieving along with client to reflect actual enabled state from inbound settings
|
||
t, client, err := s.GetClientByEmail(email)
|
||
if err != nil {
|
||
logger.Warningf("Error retrieving ClientTraffic with email %s: %v", email, err)
|
||
return nil, err
|
||
}
|
||
if t != nil && client != nil {
|
||
t.UUID = client.ID
|
||
t.SubId = client.SubID
|
||
return t, nil
|
||
}
|
||
return nil, nil
|
||
}
|
||
|
||
func (s *InboundService) UpdateClientTrafficByEmail(email string, upload int64, download int64) error {
|
||
return submitTrafficWrite(func() error {
|
||
db := database.GetDB()
|
||
err := db.Model(xray.ClientTraffic{}).
|
||
Where("email = ?", email).
|
||
Updates(map[string]any{
|
||
"up": upload,
|
||
"down": download,
|
||
}).Error
|
||
if err != nil {
|
||
logger.Warningf("Error updating ClientTraffic with email %s: %v", email, err)
|
||
}
|
||
return err
|
||
})
|
||
}
|
||
|
||
func (s *InboundService) SearchClientTraffic(query string) (traffic *xray.ClientTraffic, err error) {
|
||
db := database.GetDB()
|
||
inbound := &model.Inbound{}
|
||
traffic = &xray.ClientTraffic{}
|
||
|
||
// Search for inbound settings that contain the query
|
||
err = db.Model(model.Inbound{}).Where("settings LIKE ?", "%\""+query+"\"%").First(inbound).Error
|
||
if err != nil {
|
||
if err == gorm.ErrRecordNotFound {
|
||
logger.Warningf("Inbound settings containing query %s not found: %v", query, err)
|
||
return nil, err
|
||
}
|
||
logger.Errorf("Error searching for inbound settings with query %s: %v", query, err)
|
||
return nil, err
|
||
}
|
||
|
||
traffic.InboundId = inbound.Id
|
||
|
||
// Unmarshal settings to get clients
|
||
settings := map[string][]model.Client{}
|
||
if err := json.Unmarshal([]byte(inbound.Settings), &settings); err != nil {
|
||
logger.Errorf("Error unmarshalling inbound settings for inbound ID %d: %v", inbound.Id, err)
|
||
return nil, err
|
||
}
|
||
|
||
clients := settings["clients"]
|
||
for _, client := range clients {
|
||
if (client.ID == query || client.Password == query) && client.Email != "" {
|
||
traffic.Email = client.Email
|
||
break
|
||
}
|
||
}
|
||
|
||
if traffic.Email == "" {
|
||
logger.Warningf("No client found with query %s in inbound ID %d", query, inbound.Id)
|
||
return nil, gorm.ErrRecordNotFound
|
||
}
|
||
|
||
// Retrieve ClientTraffic based on the found email
|
||
err = db.Model(xray.ClientTraffic{}).Where("email = ?", traffic.Email).First(traffic).Error
|
||
if err != nil {
|
||
if err == gorm.ErrRecordNotFound {
|
||
logger.Warningf("ClientTraffic for email %s not found: %v", traffic.Email, err)
|
||
return nil, err
|
||
}
|
||
logger.Errorf("Error retrieving ClientTraffic for email %s: %v", traffic.Email, err)
|
||
return nil, err
|
||
}
|
||
|
||
return traffic, nil
|
||
}
|
||
|
||
func (s *InboundService) GetInboundClientIps(clientEmail string) (string, error) {
|
||
db := database.GetDB()
|
||
InboundClientIps := &model.InboundClientIps{}
|
||
err := db.Model(model.InboundClientIps{}).Where("client_email = ?", clientEmail).First(InboundClientIps).Error
|
||
if err != nil {
|
||
return "", err
|
||
}
|
||
|
||
if InboundClientIps.Ips == "" {
|
||
return "", nil
|
||
}
|
||
|
||
// Try to parse as new format (with timestamps)
|
||
type IPWithTimestamp struct {
|
||
IP string `json:"ip"`
|
||
Timestamp int64 `json:"timestamp"`
|
||
}
|
||
|
||
var ipsWithTime []IPWithTimestamp
|
||
err = json.Unmarshal([]byte(InboundClientIps.Ips), &ipsWithTime)
|
||
|
||
// If successfully parsed as new format, return with timestamps
|
||
if err == nil && len(ipsWithTime) > 0 {
|
||
return InboundClientIps.Ips, nil
|
||
}
|
||
|
||
// Otherwise, assume it's old format (simple string array)
|
||
// Try to parse as simple array and convert to new format
|
||
var oldIps []string
|
||
err = json.Unmarshal([]byte(InboundClientIps.Ips), &oldIps)
|
||
if err == nil && len(oldIps) > 0 {
|
||
// Convert old format to new format with current timestamp
|
||
newIpsWithTime := make([]IPWithTimestamp, len(oldIps))
|
||
for i, ip := range oldIps {
|
||
newIpsWithTime[i] = IPWithTimestamp{
|
||
IP: ip,
|
||
Timestamp: time.Now().Unix(),
|
||
}
|
||
}
|
||
result, _ := json.Marshal(newIpsWithTime)
|
||
return string(result), nil
|
||
}
|
||
|
||
// Return as-is if parsing fails
|
||
return InboundClientIps.Ips, nil
|
||
}
|
||
|
||
func (s *InboundService) ClearClientIps(clientEmail string) error {
|
||
db := database.GetDB()
|
||
|
||
result := db.Model(model.InboundClientIps{}).
|
||
Where("client_email = ?", clientEmail).
|
||
Update("ips", "")
|
||
err := result.Error
|
||
if err != nil {
|
||
return err
|
||
}
|
||
return nil
|
||
}
|
||
|
||
func (s *InboundService) SearchInbounds(query string) ([]*model.Inbound, error) {
|
||
db := database.GetDB()
|
||
var inbounds []*model.Inbound
|
||
err := db.Model(model.Inbound{}).Preload("ClientStats").Where("remark like ?", "%"+query+"%").Find(&inbounds).Error
|
||
if err != nil && err != gorm.ErrRecordNotFound {
|
||
return nil, err
|
||
}
|
||
return inbounds, nil
|
||
}
|
||
|
||
func (s *InboundService) MigrationRequirements() {
|
||
db := database.GetDB()
|
||
tx := db.Begin()
|
||
var err error
|
||
defer func() {
|
||
if err == nil {
|
||
tx.Commit()
|
||
if !database.IsPostgres() {
|
||
if dbErr := db.Exec(`VACUUM "main"`).Error; dbErr != nil {
|
||
logger.Warningf("VACUUM failed: %v", dbErr)
|
||
}
|
||
}
|
||
} else {
|
||
tx.Rollback()
|
||
}
|
||
}()
|
||
|
||
if tx.Migrator().HasColumn(&model.Inbound{}, "all_time") {
|
||
if err = tx.Migrator().DropColumn(&model.Inbound{}, "all_time"); err != nil {
|
||
return
|
||
}
|
||
}
|
||
if tx.Migrator().HasColumn(&xray.ClientTraffic{}, "all_time") {
|
||
if err = tx.Migrator().DropColumn(&xray.ClientTraffic{}, "all_time"); err != nil {
|
||
return
|
||
}
|
||
}
|
||
|
||
// Fix inbounds based problems
|
||
var inbounds []*model.Inbound
|
||
err = tx.Model(model.Inbound{}).Where("protocol IN (?)", []string{"vmess", "vless", "trojan", "shadowsocks", "hysteria", "hysteria2"}).Find(&inbounds).Error
|
||
if err != nil && err != gorm.ErrRecordNotFound {
|
||
return
|
||
}
|
||
for inbound_index := range inbounds {
|
||
settings := map[string]any{}
|
||
json.Unmarshal([]byte(inbounds[inbound_index].Settings), &settings)
|
||
clients, ok := settings["clients"].([]any)
|
||
if ok {
|
||
// Fix Client configuration problems
|
||
var newClients []any
|
||
hasVisionFlow := false
|
||
for client_index := range clients {
|
||
c := clients[client_index].(map[string]any)
|
||
|
||
// Add email='' if it is not exists
|
||
if _, ok := c["email"]; !ok {
|
||
c["email"] = ""
|
||
}
|
||
|
||
// Convert string tgId to int64
|
||
if _, ok := c["tgId"]; ok {
|
||
var tgId any = c["tgId"]
|
||
if tgIdStr, ok2 := tgId.(string); ok2 {
|
||
tgIdInt64, err := strconv.ParseInt(strings.ReplaceAll(tgIdStr, " ", ""), 10, 64)
|
||
if err == nil {
|
||
c["tgId"] = tgIdInt64
|
||
}
|
||
}
|
||
}
|
||
|
||
// Remove "flow": "xtls-rprx-direct"
|
||
if _, ok := c["flow"]; ok {
|
||
if c["flow"] == "xtls-rprx-direct" {
|
||
c["flow"] = ""
|
||
}
|
||
}
|
||
if flow, _ := c["flow"].(string); flow == "xtls-rprx-vision" {
|
||
hasVisionFlow = true
|
||
}
|
||
// Backfill created_at and updated_at
|
||
if _, ok := c["created_at"]; !ok {
|
||
c["created_at"] = time.Now().Unix() * 1000
|
||
}
|
||
c["updated_at"] = time.Now().Unix() * 1000
|
||
newClients = append(newClients, any(c))
|
||
}
|
||
settings["clients"] = newClients
|
||
|
||
// Drop orphaned testseed: VLESS-only field, only meaningful when at least
|
||
// one client uses the exact xtls-rprx-vision flow. Older versions saved it
|
||
// for any non-empty flow (including the UDP variant) or kept it after the
|
||
// flow was cleared from the client modal — clean those up here.
|
||
if inbounds[inbound_index].Protocol == model.VLESS && !hasVisionFlow {
|
||
delete(settings, "testseed")
|
||
}
|
||
|
||
modifiedSettings, err := json.MarshalIndent(settings, "", " ")
|
||
if err != nil {
|
||
return
|
||
}
|
||
|
||
inbounds[inbound_index].Settings = string(modifiedSettings)
|
||
}
|
||
|
||
// Add client traffic row for all clients which has email
|
||
modelClients, err := s.GetClients(inbounds[inbound_index])
|
||
if err != nil {
|
||
return
|
||
}
|
||
for _, modelClient := range modelClients {
|
||
if len(modelClient.Email) > 0 {
|
||
var count int64
|
||
tx.Model(xray.ClientTraffic{}).Where("email = ?", modelClient.Email).Count(&count)
|
||
if count == 0 {
|
||
s.AddClientStat(tx, inbounds[inbound_index].Id, &modelClient)
|
||
}
|
||
}
|
||
}
|
||
|
||
// Heal clients table for installs where the one-shot seeder
|
||
// skipped clients due to a tgId-string unmarshal error.
|
||
if syncErr := s.clientService.SyncInbound(tx, inbounds[inbound_index].Id, modelClients); syncErr != nil {
|
||
logger.Warning("MigrationRequirements sync clients failed:", syncErr)
|
||
}
|
||
}
|
||
tx.Save(inbounds)
|
||
|
||
// Remove orphaned traffics
|
||
tx.Where("inbound_id = 0").Delete(xray.ClientTraffic{})
|
||
|
||
// Migrate old MultiDomain to External Proxy
|
||
var externalProxy []struct {
|
||
Id int
|
||
Port int
|
||
StreamSettings []byte
|
||
}
|
||
err = tx.Raw(`select id, port, stream_settings
|
||
from inbounds
|
||
WHERE protocol in ('vmess','vless','trojan')
|
||
AND json_extract(stream_settings, '$.security') = 'tls'
|
||
AND json_extract(stream_settings, '$.tlsSettings.settings.domains') IS NOT NULL`).Scan(&externalProxy).Error
|
||
if err != nil || len(externalProxy) == 0 {
|
||
return
|
||
}
|
||
|
||
for _, ep := range externalProxy {
|
||
var reverses any
|
||
var stream map[string]any
|
||
json.Unmarshal(ep.StreamSettings, &stream)
|
||
if tlsSettings, ok := stream["tlsSettings"].(map[string]any); ok {
|
||
if settings, ok := tlsSettings["settings"].(map[string]any); ok {
|
||
if domains, ok := settings["domains"].([]any); ok {
|
||
for _, domain := range domains {
|
||
if domainMap, ok := domain.(map[string]any); ok {
|
||
domainMap["forceTls"] = "same"
|
||
domainMap["port"] = ep.Port
|
||
domainMap["dest"] = domainMap["domain"].(string)
|
||
delete(domainMap, "domain")
|
||
}
|
||
}
|
||
}
|
||
reverses = settings["domains"]
|
||
delete(settings, "domains")
|
||
}
|
||
}
|
||
stream["externalProxy"] = reverses
|
||
newStream, _ := json.MarshalIndent(stream, " ", " ")
|
||
tx.Model(model.Inbound{}).Where("id = ?", ep.Id).Update("stream_settings", newStream)
|
||
}
|
||
|
||
err = tx.Raw(`UPDATE inbounds
|
||
SET tag = REPLACE(tag, '0.0.0.0:', '')
|
||
WHERE INSTR(tag, '0.0.0.0:') > 0;`).Error
|
||
if err != nil {
|
||
return
|
||
}
|
||
}
|
||
|
||
func (s *InboundService) MigrateDB() {
|
||
s.MigrationRequirements()
|
||
s.MigrationRemoveOrphanedTraffics()
|
||
}
|
||
|
||
func (s *InboundService) GetOnlineClients() []string {
|
||
return p.GetOnlineClients()
|
||
}
|
||
|
||
func (s *InboundService) SetNodeOnlineClients(nodeID int, emails []string) {
|
||
if p != nil {
|
||
p.SetNodeOnlineClients(nodeID, emails)
|
||
}
|
||
}
|
||
|
||
func (s *InboundService) ClearNodeOnlineClients(nodeID int) {
|
||
if p != nil {
|
||
p.ClearNodeOnlineClients(nodeID)
|
||
}
|
||
}
|
||
|
||
func (s *InboundService) GetClientsLastOnline() (map[string]int64, error) {
|
||
db := database.GetDB()
|
||
var rows []xray.ClientTraffic
|
||
err := db.Model(&xray.ClientTraffic{}).Select("email, last_online").Find(&rows).Error
|
||
if err != nil && err != gorm.ErrRecordNotFound {
|
||
return nil, err
|
||
}
|
||
result := make(map[string]int64, len(rows))
|
||
for _, r := range rows {
|
||
result[r.Email] = r.LastOnline
|
||
}
|
||
return result, nil
|
||
}
|
||
|
||
func (s *InboundService) RefreshOnlineClientsFromMap(lastOnlineMap map[string]int64) {
|
||
now := time.Now().UnixMilli()
|
||
newOnlineClients := make([]string, 0, len(lastOnlineMap))
|
||
for email, lastOnline := range lastOnlineMap {
|
||
if now-lastOnline < onlineGracePeriodMs {
|
||
newOnlineClients = append(newOnlineClients, email)
|
||
}
|
||
}
|
||
if p != nil {
|
||
p.SetOnlineClients(newOnlineClients)
|
||
}
|
||
}
|
||
|
||
func (s *InboundService) FilterAndSortClientEmails(emails []string) ([]string, []string, error) {
|
||
db := database.GetDB()
|
||
|
||
// Step 1: Get ClientTraffic records for emails in the input list.
|
||
// Chunked to stay under SQLite's bind-variable limit on huge inputs.
|
||
uniqEmails := uniqueNonEmptyStrings(emails)
|
||
clients := make([]xray.ClientTraffic, 0, len(uniqEmails))
|
||
for _, batch := range chunkStrings(uniqEmails, sqliteMaxVars) {
|
||
var page []xray.ClientTraffic
|
||
if err := db.Where("email IN ?", batch).Find(&page).Error; err != nil && err != gorm.ErrRecordNotFound {
|
||
return nil, nil, err
|
||
}
|
||
clients = append(clients, page...)
|
||
}
|
||
|
||
// Step 2: Sort clients by (Up + Down) descending
|
||
sort.Slice(clients, func(i, j int) bool {
|
||
return (clients[i].Up + clients[i].Down) > (clients[j].Up + clients[j].Down)
|
||
})
|
||
|
||
// Step 3: Extract sorted valid emails and track found ones
|
||
validEmails := make([]string, 0, len(clients))
|
||
found := make(map[string]bool)
|
||
for _, client := range clients {
|
||
validEmails = append(validEmails, client.Email)
|
||
found[client.Email] = true
|
||
}
|
||
|
||
// Step 4: Identify emails that were not found in the database
|
||
extraEmails := make([]string, 0)
|
||
for _, email := range emails {
|
||
if !found[email] {
|
||
extraEmails = append(extraEmails, email)
|
||
}
|
||
}
|
||
|
||
return validEmails, extraEmails, nil
|
||
}
|
||
|
||
type SubLinkProvider interface {
|
||
SubLinksForSubId(host, subId string) ([]string, error)
|
||
LinksForClient(host string, inbound *model.Inbound, email string) []string
|
||
}
|
||
|
||
var registeredSubLinkProvider SubLinkProvider
|
||
|
||
func RegisterSubLinkProvider(p SubLinkProvider) {
|
||
registeredSubLinkProvider = p
|
||
}
|
||
|
||
func (s *InboundService) GetSubLinks(host, subId string) ([]string, error) {
|
||
if registeredSubLinkProvider == nil {
|
||
return nil, common.NewError("sub link provider not registered")
|
||
}
|
||
return registeredSubLinkProvider.SubLinksForSubId(host, subId)
|
||
}
|
||
func (s *InboundService) GetAllClientLinks(host string, email string) ([]string, error) {
|
||
if email == "" {
|
||
return nil, common.NewError("client email is required")
|
||
}
|
||
if registeredSubLinkProvider == nil {
|
||
return nil, common.NewError("sub link provider not registered")
|
||
}
|
||
rec, err := s.clientService.GetRecordByEmail(nil, email)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
inboundIds, err := s.clientService.GetInboundIdsForRecord(rec.Id)
|
||
if err != nil {
|
||
return nil, err
|
||
}
|
||
var links []string
|
||
for _, ibId := range inboundIds {
|
||
inbound, getErr := s.GetInbound(ibId)
|
||
if getErr != nil {
|
||
return nil, getErr
|
||
}
|
||
links = append(links, registeredSubLinkProvider.LinksForClient(host, inbound, email)...)
|
||
}
|
||
return links, nil
|
||
}
|