Files
huobao-drama/infrastructure/scheduler/resource_transfer_scheduler.go
Connor 9600fc542c init
2026-01-12 13:17:11 +08:00

241 lines
6.4 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
package scheduler
import (
"time"
"github.com/drama-generator/backend/application/services"
"github.com/drama-generator/backend/pkg/logger"
"github.com/robfig/cron/v3"
"gorm.io/gorm"
)
type ResourceTransferScheduler struct {
cron *cron.Cron
transferService *services.ResourceTransferService
db *gorm.DB
log *logger.Logger
running bool
}
func NewResourceTransferScheduler(
transferService *services.ResourceTransferService,
db *gorm.DB,
log *logger.Logger,
) *ResourceTransferScheduler {
return &ResourceTransferScheduler{
cron: cron.New(cron.WithSeconds()),
transferService: transferService,
db: db,
log: log,
running: false,
}
}
// Start 启动定时任务
func (s *ResourceTransferScheduler) Start() error {
if s.running {
s.log.Warn("Resource transfer scheduler already running")
return nil
}
s.log.Info("Starting resource transfer scheduler...")
// 每小时执行一次资源转存任务
_, err := s.cron.AddFunc("0 0 * * * *", func() {
s.log.Info("Starting scheduled resource transfer task")
s.transferPendingResources()
})
if err != nil {
return err
}
// 每天凌晨2点执行完整扫描
_, err = s.cron.AddFunc("0 0 2 * * *", func() {
s.log.Info("Starting daily full resource scan and transfer")
s.transferAllPendingResources()
})
if err != nil {
return err
}
s.cron.Start()
s.running = true
s.log.Info("Resource transfer scheduler started successfully")
return nil
}
// Stop 停止定时任务
func (s *ResourceTransferScheduler) Stop() {
if !s.running {
return
}
s.log.Info("Stopping resource transfer scheduler...")
ctx := s.cron.Stop()
<-ctx.Done()
s.running = false
s.log.Info("Resource transfer scheduler stopped")
}
// transferPendingResources 转存最近生成的待转存资源最近24小时
func (s *ResourceTransferScheduler) transferPendingResources() {
s.log.Info("Scanning for pending resources to transfer (last 24 hours)...")
// 查找最近24小时内完成的、还未转存的图片和视频
type DramaCount struct {
DramaID string
Count int64
}
// 统计每个剧本的待转存图片数量
var imageDramas []DramaCount
s.db.Raw(`
SELECT drama_id, COUNT(*) as count
FROM image_generations
WHERE status = 'completed'
AND image_url IS NOT NULL
AND image_url != ''
AND (minio_url IS NULL OR minio_url = '')
AND completed_at >= ?
GROUP BY drama_id
`, time.Now().Add(-24*time.Hour)).Scan(&imageDramas)
// 转存图片
imageCount := 0
for _, drama := range imageDramas {
count, err := s.transferService.BatchTransferImagesToMinio(drama.DramaID, 50) // 每个剧本最多转50个
if err != nil {
s.log.Errorw("Failed to transfer images for drama",
"drama_id", drama.DramaID,
"error", err)
continue
}
imageCount += count
s.log.Infow("Transferred images for drama",
"drama_id", drama.DramaID,
"count", count)
}
// 统计每个剧本的待转存视频数量
var videoDramas []DramaCount
s.db.Raw(`
SELECT drama_id, COUNT(*) as count
FROM video_generations
WHERE status = 'completed'
AND video_url IS NOT NULL
AND video_url != ''
AND (minio_url IS NULL OR minio_url = '')
AND completed_at >= ?
GROUP BY drama_id
`, time.Now().Add(-24*time.Hour)).Scan(&videoDramas)
// 转存视频
videoCount := 0
for _, drama := range videoDramas {
count, err := s.transferService.BatchTransferVideosToMinio(drama.DramaID, 50) // 每个剧本最多转50个
if err != nil {
s.log.Errorw("Failed to transfer videos for drama",
"drama_id", drama.DramaID,
"error", err)
continue
}
videoCount += count
s.log.Infow("Transferred videos for drama",
"drama_id", drama.DramaID,
"count", count)
}
s.log.Infow("Scheduled resource transfer task completed",
"images", imageCount,
"videos", videoCount)
}
// transferAllPendingResources 转存所有待转存的资源(全量扫描)
func (s *ResourceTransferScheduler) transferAllPendingResources() {
s.log.Info("Starting full scan for all pending resources...")
// 查找所有待转存的资源
type DramaCount struct {
DramaID string
Count int64
}
// 统计所有剧本的待转存图片
var imageDramas []DramaCount
s.db.Raw(`
SELECT drama_id, COUNT(*) as count
FROM image_generations
WHERE status = 'completed'
AND image_url IS NOT NULL
AND image_url != ''
AND (minio_url IS NULL OR minio_url = '')
GROUP BY drama_id
`).Scan(&imageDramas)
s.log.Infow("Found dramas with pending images", "count", len(imageDramas))
// 转存所有待转存图片
totalImageCount := 0
for _, drama := range imageDramas {
count, err := s.transferService.BatchTransferImagesToMinio(drama.DramaID, 0) // 0表示全部转存
if err != nil {
s.log.Errorw("Failed to transfer images for drama",
"drama_id", drama.DramaID,
"error", err)
continue
}
totalImageCount += count
s.log.Infow("Transferred all images for drama",
"drama_id", drama.DramaID,
"count", count)
}
// 统计所有剧本的待转存视频
var videoDramas []DramaCount
s.db.Raw(`
SELECT drama_id, COUNT(*) as count
FROM video_generations
WHERE status = 'completed'
AND video_url IS NOT NULL
AND video_url != ''
AND (minio_url IS NULL OR minio_url = '')
GROUP BY drama_id
`).Scan(&videoDramas)
s.log.Infow("Found dramas with pending videos", "count", len(videoDramas))
// 转存所有待转存视频
totalVideoCount := 0
for _, drama := range videoDramas {
count, err := s.transferService.BatchTransferVideosToMinio(drama.DramaID, 0) // 0表示全部转存
if err != nil {
s.log.Errorw("Failed to transfer videos for drama",
"drama_id", drama.DramaID,
"error", err)
continue
}
totalVideoCount += count
s.log.Infow("Transferred all videos for drama",
"drama_id", drama.DramaID,
"count", count)
}
s.log.Infow("Full resource scan and transfer completed",
"total_images", totalImageCount,
"total_videos", totalVideoCount,
"drama_count", len(imageDramas)+len(videoDramas))
}
// RunNow 立即执行一次转存任务(用于手动触发)
func (s *ResourceTransferScheduler) RunNow() {
s.log.Info("Manually triggering resource transfer task...")
go s.transferPendingResources()
}
// RunFullScan 立即执行一次全量扫描(用于手动触发)
func (s *ResourceTransferScheduler) RunFullScan() {
s.log.Info("Manually triggering full resource scan...")
go s.transferAllPendingResources()
}