From c32cffeb83c31f45ed3dd8d410c0fbe1c13cd1a8 Mon Sep 17 00:00:00 2001 From: keven1024 Date: Sun, 25 May 2025 15:31:38 +0800 Subject: [PATCH] feat(backend): implement image compression functionality and enhance Redis integration with new utility functions --- worker/go.mod | 4 ++- worker/go.sum | 6 +++++ worker/internal/tasks/image.go | 37 ++++++++++++++++++++++++++ worker/internal/utils/asyncq.go | 11 ++++++++ worker/internal/utils/env.go | 12 ++++++++- worker/internal/utils/file.go | 47 +++++++++++++++++++++++++++++++++ worker/internal/utils/redis.go | 22 +++++++++++++++ worker/main.go | 13 +++++++++ worker/middleware/logger.go | 24 +++++++++++++++++ 9 files changed, 174 insertions(+), 2 deletions(-) create mode 100644 worker/internal/utils/asyncq.go create mode 100644 worker/internal/utils/file.go create mode 100644 worker/internal/utils/redis.go create mode 100644 worker/middleware/logger.go diff --git a/worker/go.mod b/worker/go.mod index 7a4072e..10bac18 100644 --- a/worker/go.mod +++ b/worker/go.mod @@ -3,6 +3,7 @@ module worker go 1.23.1 require ( + dario.cat/mergo v1.0.2 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/fsnotify/fsnotify v1.8.0 // indirect @@ -20,7 +21,8 @@ require ( github.com/spf13/viper v1.20.1 // indirect github.com/subosito/gotenv v1.6.0 // indirect go.uber.org/atomic v1.9.0 // indirect - go.uber.org/multierr v1.9.0 // indirect + go.uber.org/multierr v1.10.0 // indirect + go.uber.org/zap v1.27.0 // indirect golang.org/x/sys v0.29.0 // indirect golang.org/x/text v0.21.0 // indirect golang.org/x/time v0.8.0 // indirect diff --git a/worker/go.sum b/worker/go.sum index 43b3959..14e38f9 100644 --- a/worker/go.sum +++ b/worker/go.sum @@ -1,3 +1,5 @@ +dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8= +dario.cat/mergo v1.0.2/go.mod h1:E/hbnu0NxMFBjpMIE34DRGLWqDy0g5FuKDhCb31ngxA= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= @@ -45,6 +47,10 @@ go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE= go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI= go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ= +go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ= +go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= +go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s= golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= diff --git a/worker/internal/tasks/image.go b/worker/internal/tasks/image.go index 3cb2df9..debc75e 100644 --- a/worker/internal/tasks/image.go +++ b/worker/internal/tasks/image.go @@ -3,6 +3,13 @@ package tasks import ( "context" "encoding/json" + "errors" + "fmt" + "os/exec" + "path/filepath" + "worker/internal/models" + "worker/internal/services" + "worker/internal/utils" "github.com/hibiken/asynq" ) @@ -20,6 +27,36 @@ func CompressImage(ctx context.Context, task *asynq.Task) error { if err := json.Unmarshal(task.Payload(), &payload); err != nil { return err } + originalFileInfo, _ := models.GetRedisFileInfo(payload.FileId) + if originalFileInfo == nil || originalFileInfo.FileType != models.FileTypeUpload { + return errors.New("文件不存在") + } + uploadPath, err := utils.GetUploadDirPath() + if err != nil { + return err + } + originalPath := filepath.Join(uploadPath, payload.FileId) + switch originalFileInfo.MimeType { + case "image/png": + args := []string{"--output", originalPath + "_compressed", originalPath} + cmd := exec.Command("pngquant", args...) + _, err := cmd.CombinedOutput() + if err != nil { + return err + } + default: + return errors.New("不支持的文件类型") + } + compressedPath := fmt.Sprintf("%s_compressed", originalPath) + compressedFileId, err := services.GenStandardFile(compressedPath, originalFileInfo.MimeType) + if err != nil { + return err + } + + models.SetRedisTaskInfo(task.ResultWriter().TaskID(), map[string]any{ + "status": "success", + "file_id": compressedFileId, + }) return nil } diff --git a/worker/internal/utils/asyncq.go b/worker/internal/utils/asyncq.go new file mode 100644 index 0000000..9ed612f --- /dev/null +++ b/worker/internal/utils/asyncq.go @@ -0,0 +1,11 @@ +package utils + +import "github.com/hibiken/asynq" + +func RedisURI2AsynqOpt(uri string) asynq.RedisConnOpt { + opt, err := asynq.ParseRedisURI(GetEnv("REDIS_URL")) + if err != nil { + panic(err) + } + return opt +} diff --git a/worker/internal/utils/env.go b/worker/internal/utils/env.go index dcd8893..dc308cf 100644 --- a/worker/internal/utils/env.go +++ b/worker/internal/utils/env.go @@ -4,12 +4,21 @@ import ( "github.com/spf13/viper" ) -var v = viper.New() +var v *viper.Viper func init() { + InitEnv() +} + +func InitEnv() { + if v != nil { + return + } + v = viper.New() v.SetConfigName(".env") v.SetConfigType("env") v.AddConfigPath(".") + v.AddConfigPath("../") v.AutomaticEnv() if err := v.ReadInConfig(); err != nil { panic(err) @@ -17,6 +26,7 @@ func init() { } func GetEnv(key string) string { + InitEnv() return v.GetString(key) } diff --git a/worker/internal/utils/file.go b/worker/internal/utils/file.go new file mode 100644 index 0000000..eba1f3d --- /dev/null +++ b/worker/internal/utils/file.go @@ -0,0 +1,47 @@ +package utils + +import ( + "bufio" + "crypto/md5" + "fmt" + "io" + "os" + "path/filepath" +) + +func GetFileId(fileHash string, fileSize int64) string { + return fmt.Sprintf("%s_%d", fileHash, fileSize) +} + +func GetFileMd5(file io.Reader) (string, error) { + + const bufferSize = 1024 * 1000 // 1MB + + hash := md5.New() + for buf, reader := make([]byte, bufferSize), bufio.NewReader(file); ; { + n, err := reader.Read(buf) + if err != nil { + if err == io.EOF { + break + } + return "", err + } + + hash.Write(buf[:n]) + } + + return fmt.Sprintf("%x", hash.Sum(nil)), nil +} + +func GetUploadDirPath() (string, error) { + basepath, err := os.Getwd() + if err != nil { + return "", err + } + finalPath := filepath.Join(basepath, "uploads") + uploadPath := GetEnvWithDefault("UPLOAD_PATH", finalPath) + if err := os.MkdirAll(uploadPath, 0755); err != nil { + return "", err + } + return uploadPath, nil +} diff --git a/worker/internal/utils/redis.go b/worker/internal/utils/redis.go new file mode 100644 index 0000000..f770d76 --- /dev/null +++ b/worker/internal/utils/redis.go @@ -0,0 +1,22 @@ +package utils + +import ( + "context" + + "github.com/redis/go-redis/v9" +) + +var rdb *redis.Client = InitRedis() +var ctx = context.Background() + +func InitRedis() *redis.Client { + opt, err := redis.ParseURL(GetEnv("REDIS_URL")) + if err != nil { + panic(err) + } + return redis.NewClient(opt) +} + +func GetRedisClient() (*redis.Client, context.Context) { + return rdb, ctx +} diff --git a/worker/main.go b/worker/main.go index 66204e4..b54cf39 100644 --- a/worker/main.go +++ b/worker/main.go @@ -4,18 +4,31 @@ import ( "log" "worker/internal/tasks" "worker/internal/utils" + "worker/middleware" "github.com/hibiken/asynq" "github.com/spf13/cast" + "go.uber.org/zap" ) func main() { + // 日志 + var logger *zap.Logger + if utils.GetEnvWithDefault("NODE_ENV", "production") == "production" { + logger, _ = zap.NewProduction() + } else { + logger, _ = zap.NewDevelopment() + } + defer logger.Sync() + zap.ReplaceGlobals(logger) + srv := asynq.NewServer( utils.RedisURI2AsynqOpt(utils.GetEnv("REDIS_URL")), asynq.Config{Concurrency: cast.ToInt(utils.GetEnvWithDefault("WORKER_CONCURRENCY", "4"))}, ) mux := asynq.NewServeMux() + mux.Use(middleware.LoggerMiddleware) mux.HandleFunc("image:compress", tasks.CompressImage) if err := srv.Run(mux); err != nil { diff --git a/worker/middleware/logger.go b/worker/middleware/logger.go new file mode 100644 index 0000000..de3ab51 --- /dev/null +++ b/worker/middleware/logger.go @@ -0,0 +1,24 @@ +package middleware + +import ( + "context" + "time" + + "github.com/hibiken/asynq" + "go.uber.org/zap" +) + +func LoggerMiddleware(h asynq.Handler) asynq.Handler { + return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error { + start := time.Now() + task_type := zap.String("type", t.Type()) + zap.L().Info("[%q] - 开始处理", task_type) + err := h.ProcessTask(ctx, t) + if err != nil { + zap.L().Error("[%q] - 处理失败 - %v", task_type, zap.Error(err)) + return err + } + zap.L().Info("[%q] - 完成处理 | 耗时 %v", task_type, zap.Duration("duration", time.Since(start))) + return nil + }) +}