mirror of
https://github.com/keven1024/015.git
synced 2026-05-26 07:08:02 +00:00
feat(backend): implement Redis file and task management with logging support
This commit is contained in:
60
worker/internal/models/file.go
Normal file
60
worker/internal/models/file.go
Normal file
@@ -0,0 +1,60 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"worker/internal/utils"
|
||||
|
||||
"dario.cat/mergo"
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
type FileInfo struct {
|
||||
FileSize int64 `json:"size"`
|
||||
MimeType string `json:"mime_type"`
|
||||
FileHash string `json:"hash"`
|
||||
ChunkSize int64 `json:"chunk_size"`
|
||||
}
|
||||
|
||||
type FileType string
|
||||
|
||||
const (
|
||||
FileTypeInit FileType = "init"
|
||||
FileTypeUpload FileType = "already"
|
||||
)
|
||||
|
||||
type RedisFileInfo struct {
|
||||
FileInfo
|
||||
FileType FileType `json:"type"`
|
||||
CreatedAt int64 `json:"created_at"`
|
||||
Expire int64 `json:"expire"` // 只有上传文件(init)的时候有这个字段
|
||||
}
|
||||
|
||||
func GetRedisFileInfo(fileId string) (*RedisFileInfo, error) {
|
||||
rdb, ctx := utils.GetRedisClient()
|
||||
fileInfoUnmarshalData, err := rdb.HGet(ctx, "015:fileInfoMap", fileId).Result()
|
||||
if err == redis.Nil {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var fileInfoData RedisFileInfo
|
||||
if err := json.Unmarshal([]byte(fileInfoUnmarshalData), &fileInfoData); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &fileInfoData, nil
|
||||
}
|
||||
|
||||
func SetRedisFileInfo(fileId string, fileInfo RedisFileInfo) error {
|
||||
rdb, ctx := utils.GetRedisClient()
|
||||
old_fileInfo, err := GetRedisFileInfo(fileId)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if old_fileInfo != nil {
|
||||
mergo.Merge(&fileInfo, old_fileInfo)
|
||||
}
|
||||
jsonData, _ := json.Marshal(fileInfo)
|
||||
_, err = rdb.HSet(ctx, "015:fileInfoMap", fileId, string(jsonData)).Result()
|
||||
return err
|
||||
}
|
||||
35
worker/internal/models/task.go
Normal file
35
worker/internal/models/task.go
Normal file
@@ -0,0 +1,35 @@
|
||||
package models
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"time"
|
||||
"worker/internal/utils"
|
||||
|
||||
"github.com/redis/go-redis/v9"
|
||||
)
|
||||
|
||||
func GetRedisTaskInfo(taskId string) (*map[string]any, error) {
|
||||
rdb, ctx := utils.GetRedisClient()
|
||||
taskInfo := rdb.Get(ctx, fmt.Sprintf("015:taskInfoMap:%s", taskId))
|
||||
taskInfoUnmarshalData, err := taskInfo.Result()
|
||||
if err == redis.Nil {
|
||||
return nil, nil
|
||||
}
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var taskInfoData map[string]any
|
||||
|
||||
if err := json.Unmarshal([]byte(taskInfoUnmarshalData), &taskInfoData); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &taskInfoData, nil
|
||||
}
|
||||
|
||||
func SetRedisTaskInfo(taskId string, taskInfo map[string]any) error {
|
||||
rdb, ctx := utils.GetRedisClient()
|
||||
jsonData, _ := json.Marshal(taskInfo)
|
||||
_, err := rdb.Set(ctx, fmt.Sprintf("015:taskInfoMap:%s", taskId), jsonData, time.Hour).Result()
|
||||
return err
|
||||
}
|
||||
52
worker/internal/services/file.go
Normal file
52
worker/internal/services/file.go
Normal file
@@ -0,0 +1,52 @@
|
||||
package services
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"worker/internal/models"
|
||||
"worker/internal/utils"
|
||||
)
|
||||
|
||||
// 生成标准格式的file
|
||||
func GenStandardFile(filePath string, mimeType string) (string, error) {
|
||||
if _, err := os.Stat(filePath); os.IsNotExist(err) {
|
||||
return "", errors.New("文件不存在")
|
||||
}
|
||||
compressedFile, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
defer compressedFile.Close()
|
||||
|
||||
compressedFileInfo, err := compressedFile.Stat()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
compressedFileSize := compressedFileInfo.Size()
|
||||
|
||||
compressedFileHash, err := utils.GetFileMd5(compressedFile)
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
|
||||
compressedFileId := utils.GetFileId(compressedFileHash, compressedFileSize)
|
||||
|
||||
uploadPath, err := utils.GetUploadDirPath()
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
newPath := filepath.Join(uploadPath, compressedFileId)
|
||||
if err := os.Rename(filePath, newPath); err != nil {
|
||||
return "", err
|
||||
}
|
||||
models.SetRedisFileInfo(compressedFileId, models.RedisFileInfo{
|
||||
FileInfo: models.FileInfo{
|
||||
FileSize: compressedFileSize,
|
||||
FileHash: compressedFileHash,
|
||||
MimeType: mimeType,
|
||||
},
|
||||
})
|
||||
|
||||
return compressedFileId, nil
|
||||
}
|
||||
@@ -11,7 +11,7 @@ import (
|
||||
|
||||
func main() {
|
||||
srv := asynq.NewServer(
|
||||
asynq.RedisClientOpt{Addr: utils.GetEnv("REDIS_URL")},
|
||||
utils.RedisURI2AsynqOpt(utils.GetEnv("REDIS_URL")),
|
||||
asynq.Config{Concurrency: cast.ToInt(utils.GetEnvWithDefault("WORKER_CONCURRENCY", "4"))},
|
||||
)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user