Files
3x-ui/database/migrate_data.go
MHSanaei c8ad42631c fix(migrate): copy composite-key tables without FindInBatches (#4787)
SQLite to Postgres migration aborted with "copy *model.ClientInbound:
primary key required" on installs whose client_inbounds table exceeds
one read batch (500 rows). gorm's FindInBatches pages between batches
using a single PrioritizedPrimaryField, which composite-key tables
(client_id + inbound_id, no surrogate id) do not have, so it returns
ErrPrimaryKeyRequired once a table holds more than one batch.

Replace FindInBatches in copyTable with explicit LIMIT/OFFSET paging
ordered by the model's primary-key columns. This works for every table
including composite-key ones, keeps memory bounded, and changes no
schema.

Add a Postgres-gated regression test covering a >500-row composite-key
table.
2026-06-02 04:20:42 +02:00

174 lines
4.9 KiB
Go

package database
import (
"errors"
"fmt"
"log"
"os"
"path"
"reflect"
"strings"
"time"
"github.com/mhsanaei/3x-ui/v3/database/model"
"github.com/mhsanaei/3x-ui/v3/xray"
"gorm.io/driver/postgres"
"gorm.io/driver/sqlite"
"gorm.io/gorm"
"gorm.io/gorm/logger"
)
// migrationModels is the FK-aware order in which tables are created and copied.
// Parents come before their children so foreign-key constraints stay satisfied
// even when checks are not explicitly disabled.
func migrationModels() []any {
return []any{
&model.User{},
&model.Setting{},
&model.HistoryOfSeeders{},
&model.CustomGeoResource{},
&model.Node{},
&model.ApiToken{},
&model.Inbound{},
&xray.ClientTraffic{},
&model.OutboundTraffics{},
&model.InboundClientIps{},
&model.ClientRecord{},
&model.ClientInbound{},
&model.InboundFallback{},
&model.NodeClientTraffic{},
}
}
// MigrateData copies every row from the configured SQLite file at srcPath into
// a fresh PostgreSQL database described by dstDSN. The destination tables are
// (re)created with AutoMigrate before the copy. Source data is left untouched.
func MigrateData(srcPath, dstDSN string) error {
if _, err := os.Stat(srcPath); err != nil {
return fmt.Errorf("source sqlite not found at %s: %w", srcPath, err)
}
if dstDSN == "" {
return errors.New("destination DSN is required")
}
if err := os.MkdirAll(path.Dir(srcPath), 0755); err != nil {
return err
}
srcDSN := srcPath + "?_journal_mode=WAL&_busy_timeout=10000"
src, err := gorm.Open(sqlite.Open(srcDSN), &gorm.Config{Logger: logger.Discard})
if err != nil {
return fmt.Errorf("open sqlite source: %w", err)
}
srcSQL, err := src.DB()
if err != nil {
return err
}
defer srcSQL.Close()
dst, err := gorm.Open(postgres.Open(dstDSN), &gorm.Config{Logger: logger.Discard})
if err != nil {
return fmt.Errorf("open postgres destination: %w", err)
}
dstSQL, err := dst.DB()
if err != nil {
return err
}
defer dstSQL.Close()
dstSQL.SetConnMaxLifetime(time.Hour)
log.Println("Creating destination schema...")
for _, m := range migrationModels() {
if err := dst.AutoMigrate(m); err != nil {
return fmt.Errorf("AutoMigrate %T: %w", m, err)
}
}
totalRows := 0
for _, m := range migrationModels() {
n, err := copyTable(src, dst, m)
if err != nil {
return fmt.Errorf("copy %T: %w", m, err)
}
totalRows += n
log.Printf(" %-32s %d rows", reflect.TypeOf(m).Elem().Name(), n)
}
if err := resetPostgresSequences(dst); err != nil {
log.Printf("warning: failed to reset some postgres sequences: %v", err)
}
log.Printf("Migration complete: %d rows across %d tables.", totalRows, len(migrationModels()))
log.Println("Set XUI_DB_TYPE=postgres and XUI_DB_DSN=... in /etc/default/x-ui, then restart x-ui.")
return nil
}
func copyTable(src, dst *gorm.DB, mdl any) (int, error) {
const batchSize = 500
sliceType := reflect.SliceOf(reflect.PointerTo(reflect.TypeOf(mdl).Elem()))
// Resolve primary-key columns so paging is deterministic across successive
// LIMIT/OFFSET reads. The model set is trusted (not user input).
stmt := &gorm.Statement{DB: src}
if err := stmt.Parse(mdl); err != nil {
return 0, err
}
order := strings.Join(stmt.Schema.PrimaryFieldDBNames, ", ")
total := 0
for offset := 0; ; offset += batchSize {
batchPtr := reflect.New(sliceType)
q := src.Model(mdl).Limit(batchSize).Offset(offset)
if order != "" {
q = q.Order(order)
}
if err := q.Find(batchPtr.Interface()).Error; err != nil {
return total, err
}
n := batchPtr.Elem().Len()
if n == 0 {
break
}
if err := dst.CreateInBatches(batchPtr.Interface(), 200).Error; err != nil {
return total, err
}
total += n
if n < batchSize {
break
}
}
return total, nil
}
// resetPostgresSequences advances each migrated table's id sequence past MAX(id),
// otherwise the next INSERT-without-id would clash with copied rows.
func resetPostgresSequences(dst *gorm.DB) error {
return resyncPostgresSequences(dst, migrationModels())
}
// resyncPostgresSequences sets each model's id sequence to MAX(id) so the next
// auto-increment INSERT won't collide with an existing row. Table names are
// resolved from the models themselves (not hardcoded), so they always match the
// migrated tables. The statement is a no-op for tables without an id sequence
// (e.g. composite-PK tables), and idempotent on a healthy DB, so it is safe to
// run both after migration and on every Postgres startup.
func resyncPostgresSequences(db *gorm.DB, models []any) error {
for _, m := range models {
stmt := &gorm.Statement{DB: db}
if err := stmt.Parse(m); err != nil {
continue
}
t := stmt.Table
// t comes from the trusted model set parsed by GORM, not user input, so
// interpolating it as an identifier is safe. We ignore errors per-table.
_ = db.Exec(
`SELECT setval(pg_get_serial_sequence(?, 'id'), COALESCE((SELECT MAX(id) FROM "`+t+`"), 1), true)
WHERE pg_get_serial_sequence(?, 'id') IS NOT NULL`,
t, t,
).Error
}
return nil
}