From 6ed90e9ac97f3b0640a41c82bcbbfe266df01afc Mon Sep 17 00:00:00 2001 From: keven1024 Date: Sun, 25 May 2025 15:13:46 +0800 Subject: [PATCH] feat(backend): implement Redis file and task management with logging support --- worker/internal/models/file.go | 60 ++++++++++++++++++++++++++++++++ worker/internal/models/task.go | 35 +++++++++++++++++++ worker/internal/services/file.go | 52 +++++++++++++++++++++++++++ worker/main.go | 2 +- 4 files changed, 148 insertions(+), 1 deletion(-) create mode 100644 worker/internal/models/file.go create mode 100644 worker/internal/models/task.go create mode 100644 worker/internal/services/file.go diff --git a/worker/internal/models/file.go b/worker/internal/models/file.go new file mode 100644 index 0000000..09b0a75 --- /dev/null +++ b/worker/internal/models/file.go @@ -0,0 +1,60 @@ +package models + +import ( + "encoding/json" + "worker/internal/utils" + + "dario.cat/mergo" + "github.com/redis/go-redis/v9" +) + +type FileInfo struct { + FileSize int64 `json:"size"` + MimeType string `json:"mime_type"` + FileHash string `json:"hash"` + ChunkSize int64 `json:"chunk_size"` +} + +type FileType string + +const ( + FileTypeInit FileType = "init" + FileTypeUpload FileType = "already" +) + +type RedisFileInfo struct { + FileInfo + FileType FileType `json:"type"` + CreatedAt int64 `json:"created_at"` + Expire int64 `json:"expire"` // 只有上传文件(init)的时候有这个字段 +} + +func GetRedisFileInfo(fileId string) (*RedisFileInfo, error) { + rdb, ctx := utils.GetRedisClient() + fileInfoUnmarshalData, err := rdb.HGet(ctx, "015:fileInfoMap", fileId).Result() + if err == redis.Nil { + return nil, nil + } + if err != nil { + return nil, err + } + var fileInfoData RedisFileInfo + if err := json.Unmarshal([]byte(fileInfoUnmarshalData), &fileInfoData); err != nil { + return nil, err + } + return &fileInfoData, nil +} + +func SetRedisFileInfo(fileId string, fileInfo RedisFileInfo) error { + rdb, ctx := utils.GetRedisClient() + old_fileInfo, err := GetRedisFileInfo(fileId) + if err != nil { + return err + } + if old_fileInfo != nil { + mergo.Merge(&fileInfo, old_fileInfo) + } + jsonData, _ := json.Marshal(fileInfo) + _, err = rdb.HSet(ctx, "015:fileInfoMap", fileId, string(jsonData)).Result() + return err +} diff --git a/worker/internal/models/task.go b/worker/internal/models/task.go new file mode 100644 index 0000000..ce00b42 --- /dev/null +++ b/worker/internal/models/task.go @@ -0,0 +1,35 @@ +package models + +import ( + "encoding/json" + "fmt" + "time" + "worker/internal/utils" + + "github.com/redis/go-redis/v9" +) + +func GetRedisTaskInfo(taskId string) (*map[string]any, error) { + rdb, ctx := utils.GetRedisClient() + taskInfo := rdb.Get(ctx, fmt.Sprintf("015:taskInfoMap:%s", taskId)) + taskInfoUnmarshalData, err := taskInfo.Result() + if err == redis.Nil { + return nil, nil + } + if err != nil { + return nil, err + } + var taskInfoData map[string]any + + if err := json.Unmarshal([]byte(taskInfoUnmarshalData), &taskInfoData); err != nil { + return nil, err + } + return &taskInfoData, nil +} + +func SetRedisTaskInfo(taskId string, taskInfo map[string]any) error { + rdb, ctx := utils.GetRedisClient() + jsonData, _ := json.Marshal(taskInfo) + _, err := rdb.Set(ctx, fmt.Sprintf("015:taskInfoMap:%s", taskId), jsonData, time.Hour).Result() + return err +} diff --git a/worker/internal/services/file.go b/worker/internal/services/file.go new file mode 100644 index 0000000..d14eb9b --- /dev/null +++ b/worker/internal/services/file.go @@ -0,0 +1,52 @@ +package services + +import ( + "errors" + "os" + "path/filepath" + "worker/internal/models" + "worker/internal/utils" +) + +// 生成标准格式的file +func GenStandardFile(filePath string, mimeType string) (string, error) { + if _, err := os.Stat(filePath); os.IsNotExist(err) { + return "", errors.New("文件不存在") + } + compressedFile, err := os.Open(filePath) + if err != nil { + return "", err + } + defer compressedFile.Close() + + compressedFileInfo, err := compressedFile.Stat() + if err != nil { + return "", err + } + compressedFileSize := compressedFileInfo.Size() + + compressedFileHash, err := utils.GetFileMd5(compressedFile) + if err != nil { + return "", err + } + + compressedFileId := utils.GetFileId(compressedFileHash, compressedFileSize) + + uploadPath, err := utils.GetUploadDirPath() + if err != nil { + return "", err + } + newPath := filepath.Join(uploadPath, compressedFileId) + if err := os.Rename(filePath, newPath); err != nil { + return "", err + } + models.SetRedisFileInfo(compressedFileId, models.RedisFileInfo{ + FileInfo: models.FileInfo{ + FileSize: compressedFileSize, + FileHash: compressedFileHash, + MimeType: mimeType, + }, + }) + + return compressedFileId, nil +} diff --git a/worker/main.go b/worker/main.go index 1219551..66204e4 100644 --- a/worker/main.go +++ b/worker/main.go @@ -11,7 +11,7 @@ import ( func main() { srv := asynq.NewServer( - asynq.RedisClientOpt{Addr: utils.GetEnv("REDIS_URL")}, + utils.RedisURI2AsynqOpt(utils.GetEnv("REDIS_URL")), asynq.Config{Concurrency: cast.ToInt(utils.GetEnvWithDefault("WORKER_CONCURRENCY", "4"))}, )