mirror of
https://github.com/keven1024/015.git
synced 2026-05-26 15:13:30 +00:00
feat(backend): implement image compression functionality and enhance Redis integration with new utility functions
This commit is contained in:
@@ -3,6 +3,7 @@ module worker
|
||||
go 1.23.1
|
||||
|
||||
require (
|
||||
dario.cat/mergo v1.0.2 // indirect
|
||||
github.com/cespare/xxhash/v2 v2.3.0 // indirect
|
||||
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
|
||||
github.com/fsnotify/fsnotify v1.8.0 // indirect
|
||||
@@ -20,7 +21,8 @@ require (
|
||||
github.com/spf13/viper v1.20.1 // indirect
|
||||
github.com/subosito/gotenv v1.6.0 // indirect
|
||||
go.uber.org/atomic v1.9.0 // indirect
|
||||
go.uber.org/multierr v1.9.0 // indirect
|
||||
go.uber.org/multierr v1.10.0 // indirect
|
||||
go.uber.org/zap v1.27.0 // indirect
|
||||
golang.org/x/sys v0.29.0 // indirect
|
||||
golang.org/x/text v0.21.0 // indirect
|
||||
golang.org/x/time v0.8.0 // indirect
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
dario.cat/mergo v1.0.2 h1:85+piFYR1tMbRrLcDwR18y4UKJ3aH1Tbzi24VRW1TK8=
|
||||
dario.cat/mergo v1.0.2/go.mod h1:E/hbnu0NxMFBjpMIE34DRGLWqDy0g5FuKDhCb31ngxA=
|
||||
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
|
||||
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
|
||||
github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs=
|
||||
@@ -45,6 +47,10 @@ go.uber.org/atomic v1.9.0 h1:ECmE8Bn/WFTYwEW/bpKD3M8VtR/zQVbavAoalC1PYyE=
|
||||
go.uber.org/atomic v1.9.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc=
|
||||
go.uber.org/multierr v1.9.0 h1:7fIwc/ZtS0q++VgcfqFDxSBZVv/Xo49/SYnDFupUwlI=
|
||||
go.uber.org/multierr v1.9.0/go.mod h1:X2jQV1h+kxSjClGpnseKVIxpmcjrj7MNnI0bnlfKTVQ=
|
||||
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
|
||||
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
|
||||
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
|
||||
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
|
||||
golang.org/x/sys v0.27.0 h1:wBqf8DvsY9Y/2P8gAfPDEYNuS30J4lPHJxXSb/nJZ+s=
|
||||
golang.org/x/sys v0.27.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
|
||||
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
|
||||
|
||||
@@ -3,6 +3,13 @@ package tasks
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"os/exec"
|
||||
"path/filepath"
|
||||
"worker/internal/models"
|
||||
"worker/internal/services"
|
||||
"worker/internal/utils"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
)
|
||||
@@ -20,6 +27,36 @@ func CompressImage(ctx context.Context, task *asynq.Task) error {
|
||||
if err := json.Unmarshal(task.Payload(), &payload); err != nil {
|
||||
return err
|
||||
}
|
||||
originalFileInfo, _ := models.GetRedisFileInfo(payload.FileId)
|
||||
if originalFileInfo == nil || originalFileInfo.FileType != models.FileTypeUpload {
|
||||
return errors.New("文件不存在")
|
||||
}
|
||||
uploadPath, err := utils.GetUploadDirPath()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
originalPath := filepath.Join(uploadPath, payload.FileId)
|
||||
switch originalFileInfo.MimeType {
|
||||
case "image/png":
|
||||
args := []string{"--output", originalPath + "_compressed", originalPath}
|
||||
cmd := exec.Command("pngquant", args...)
|
||||
_, err := cmd.CombinedOutput()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
default:
|
||||
return errors.New("不支持的文件类型")
|
||||
}
|
||||
compressedPath := fmt.Sprintf("%s_compressed", originalPath)
|
||||
compressedFileId, err := services.GenStandardFile(compressedPath, originalFileInfo.MimeType)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
models.SetRedisTaskInfo(task.ResultWriter().TaskID(), map[string]any{
|
||||
"status": "success",
|
||||
"file_id": compressedFileId,
|
||||
})
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
11
worker/internal/utils/asyncq.go
Normal file
11
worker/internal/utils/asyncq.go
Normal file
@@ -0,0 +1,11 @@
|
||||
package utils
|
||||
|
||||
import "github.com/hibiken/asynq"
|
||||
|
||||
func RedisURI2AsynqOpt(uri string) asynq.RedisConnOpt {
|
||||
opt, err := asynq.ParseRedisURI(GetEnv("REDIS_URL"))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return opt
|
||||
}
|
||||
@@ -4,12 +4,21 @@ import (
|
||||
"github.com/spf13/viper"
|
||||
)
|
||||
|
||||
var v = viper.New()
|
||||
var v *viper.Viper
|
||||
|
||||
func init() {
|
||||
InitEnv()
|
||||
}
|
||||
|
||||
func InitEnv() {
|
||||
if v != nil {
|
||||
return
|
||||
}
|
||||
v = viper.New()
|
||||
v.SetConfigName(".env")
|
||||
v.SetConfigType("env")
|
||||
v.AddConfigPath(".")
|
||||
v.AddConfigPath("../")
|
||||
v.AutomaticEnv()
|
||||
if err := v.ReadInConfig(); err != nil {
|
||||
panic(err)
|
||||
@@ -17,6 +26,7 @@ func init() {
|
||||
}
|
||||
|
||||
func GetEnv(key string) string {
|
||||
InitEnv()
|
||||
return v.GetString(key)
|
||||
}
|
||||
|
||||
|
||||
47
worker/internal/utils/file.go
Normal file
47
worker/internal/utils/file.go
Normal file
@@ -0,0 +1,47 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"bufio"
|
||||
"crypto/md5"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
)
|
||||
|
||||
func GetFileId(fileHash string, fileSize int64) string {
|
||||
return fmt.Sprintf("%s_%d", fileHash, fileSize)
|
||||
}
|
||||
|
||||
func GetFileMd5(file io.Reader) (string, error) {
|
||||
|
||||
const bufferSize = 1024 * 1000 // 1MB
|
||||
|
||||
hash := md5.New()
|
||||
for buf, reader := make([]byte, bufferSize), bufio.NewReader(file); ; {
|
||||
n, err := reader.Read(buf)
|
||||
if err != nil {
|
||||
if err == io.EOF {
|
||||
break
|
||||
}
|
||||
return "", err
|
||||
}
|
||||
|
||||
hash.Write(buf[:n])
|
||||
}
|
||||
|
||||
return fmt.Sprintf("%x", hash.Sum(nil)), nil
|
||||
}
|
||||
|
||||
func GetUploadDirPath() (string, error) {
|
||||
basepath, err := os.Getwd()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
finalPath := filepath.Join(basepath, "uploads")
|
||||
uploadPath := GetEnvWithDefault("UPLOAD_PATH", finalPath)
|
||||
if err := os.MkdirAll(uploadPath, 0755); err != nil {
|
||||
return "", err
|
||||
}
|
||||
return uploadPath, nil
|
||||
}
|
||||
22
worker/internal/utils/redis.go
Normal file
22
worker/internal/utils/redis.go
Normal file
@@ -0,0 +1,22 @@
|
||||
package utils
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
var rdb *redis.Client = InitRedis()
|
||||
var ctx = context.Background()
|
||||
|
||||
func InitRedis() *redis.Client {
|
||||
opt, err := redis.ParseURL(GetEnv("REDIS_URL"))
|
||||
if err != nil {
|
||||
panic(err)
|
||||
}
|
||||
return redis.NewClient(opt)
|
||||
}
|
||||
|
||||
func GetRedisClient() (*redis.Client, context.Context) {
|
||||
return rdb, ctx
|
||||
}
|
||||
@@ -4,18 +4,31 @@ import (
|
||||
"log"
|
||||
"worker/internal/tasks"
|
||||
"worker/internal/utils"
|
||||
"worker/middleware"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"github.com/spf13/cast"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func main() {
|
||||
// 日志
|
||||
var logger *zap.Logger
|
||||
if utils.GetEnvWithDefault("NODE_ENV", "production") == "production" {
|
||||
logger, _ = zap.NewProduction()
|
||||
} else {
|
||||
logger, _ = zap.NewDevelopment()
|
||||
}
|
||||
defer logger.Sync()
|
||||
zap.ReplaceGlobals(logger)
|
||||
|
||||
srv := asynq.NewServer(
|
||||
utils.RedisURI2AsynqOpt(utils.GetEnv("REDIS_URL")),
|
||||
asynq.Config{Concurrency: cast.ToInt(utils.GetEnvWithDefault("WORKER_CONCURRENCY", "4"))},
|
||||
)
|
||||
|
||||
mux := asynq.NewServeMux()
|
||||
mux.Use(middleware.LoggerMiddleware)
|
||||
mux.HandleFunc("image:compress", tasks.CompressImage)
|
||||
|
||||
if err := srv.Run(mux); err != nil {
|
||||
|
||||
24
worker/middleware/logger.go
Normal file
24
worker/middleware/logger.go
Normal file
@@ -0,0 +1,24 @@
|
||||
package middleware
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
|
||||
"github.com/hibiken/asynq"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
func LoggerMiddleware(h asynq.Handler) asynq.Handler {
|
||||
return asynq.HandlerFunc(func(ctx context.Context, t *asynq.Task) error {
|
||||
start := time.Now()
|
||||
task_type := zap.String("type", t.Type())
|
||||
zap.L().Info("[%q] - 开始处理", task_type)
|
||||
err := h.ProcessTask(ctx, t)
|
||||
if err != nil {
|
||||
zap.L().Error("[%q] - 处理失败 - %v", task_type, zap.Error(err))
|
||||
return err
|
||||
}
|
||||
zap.L().Info("[%q] - 完成处理 | 耗时 %v", task_type, zap.Duration("duration", time.Since(start)))
|
||||
return nil
|
||||
})
|
||||
}
|
||||
Reference in New Issue
Block a user