定时任务
VEF Framework 提供了定时任务支持,基于 cron 表达式调度任务。
配置定时任务
yaml
# config.yaml
cron:
enabled: true
timezone: "Asia/Shanghai"定义任务
基本任务
go
package jobs
import (
"github.com/ilxqx/vef-framework-go/cron"
)
// CleanupJob cleans up expired data
type CleanupJob struct{}
func NewCleanupJob() cron.Job {
return &CleanupJob{}
}
// Name returns the job name
func (j *CleanupJob) Name() string {
return "cleanup"
}
// Schedule returns the cron expression
func (j *CleanupJob) Schedule() string {
return "0 2 * * *" // Run at 2:00 AM daily
}
// Run executes the job
func (j *CleanupJob) Run() error {
// Cleanup logic
log.Info("Running cleanup job")
// Delete expired sessions
orm.DB().Where("expired_at < ?", time.Now()).Delete(&models.Session{})
// Delete old logs
orm.DB().Where("created_at < ?", time.Now().AddDate(0, -1, 0)).Delete(&models.AuditLog{})
return nil
}带依赖的任务
go
type ReportJob struct {
emailService *services.EmailService
reportService *services.ReportService
}
func NewReportJob(
emailService *services.EmailService,
reportService *services.ReportService,
) cron.Job {
return &ReportJob{
emailService: emailService,
reportService: reportService,
}
}
func (j *ReportJob) Name() string {
return "daily-report"
}
func (j *ReportJob) Schedule() string {
return "0 8 * * *" // Run at 8:00 AM daily
}
func (j *ReportJob) Run() error {
// Generate report
report, err := j.reportService.GenerateDailyReport()
if err != nil {
return err
}
// Send report via email
return j.emailService.SendReport(report)
}注册任务
go
func NewCronModule() fx.Option {
return fx.Options(
fx.Provide(
jobs.NewCleanupJob,
jobs.NewReportJob,
),
fx.Invoke(RegisterJobs),
)
}
func RegisterJobs(
scheduler *cron.Scheduler,
cleanupJob *jobs.CleanupJob,
reportJob *jobs.ReportJob,
) {
scheduler.Register(cleanupJob)
scheduler.Register(reportJob)
}Cron 表达式
表达式格式
┌───────────── 分钟 (0 - 59)
│ ┌───────────── 小时 (0 - 23)
│ │ ┌───────────── 日 (1 - 31)
│ │ │ ┌───────────── 月 (1 - 12)
│ │ │ │ ┌───────────── 星期 (0 - 6, 0 = 周日)
│ │ │ │ │
* * * * *常用表达式
| 表达式 | 说明 |
|---|---|
* * * * * | 每分钟 |
0 * * * * | 每小时 |
0 0 * * * | 每天午夜 |
0 0 * * 0 | 每周日午夜 |
0 0 1 * * | 每月1日午夜 |
0 8 * * 1-5 | 工作日早上8点 |
*/5 * * * * | 每5分钟 |
0 9,18 * * * | 每天9点和18点 |
任务管理
启用/禁用任务
go
// Disable job
scheduler.Disable("cleanup")
// Enable job
scheduler.Enable("cleanup")
// Check if job is enabled
if scheduler.IsEnabled("cleanup") {
// Job is enabled
}手动触发任务
go
// Run job immediately
err := scheduler.RunNow("cleanup")获取任务状态
go
// Get job info
info := scheduler.GetJobInfo("cleanup")
fmt.Printf("Name: %s\n", info.Name)
fmt.Printf("Schedule: %s\n", info.Schedule)
fmt.Printf("LastRun: %s\n", info.LastRun)
fmt.Printf("NextRun: %s\n", info.NextRun)
fmt.Printf("Status: %s\n", info.Status)任务锁
防止并发执行
go
type CleanupJob struct {
locker cron.Locker
}
func (j *CleanupJob) Run() error {
// Acquire lock
lock, err := j.locker.Lock("cleanup-job", time.Hour)
if err != nil {
return nil // Another instance is running
}
defer lock.Release()
// Execute job
return j.doCleanup()
}错误处理
任务重试
go
type RetryableJob struct {
maxRetries int
}
func (j *RetryableJob) Run() error {
var lastErr error
for i := 0; i < j.maxRetries; i++ {
if err := j.doWork(); err != nil {
lastErr = err
time.Sleep(time.Second * time.Duration(i+1))
continue
}
return nil
}
return lastErr
}错误通知
go
func RegisterJobErrorHandler(scheduler *cron.Scheduler, notifier *services.Notifier) {
scheduler.OnError(func(job cron.Job, err error) {
// Log error
log.WithFields(log.Fields{
"job": job.Name(),
"error": err.Error(),
}).Error("Job failed")
// Send notification
notifier.SendAlert(fmt.Sprintf("Job %s failed: %s", job.Name(), err.Error()))
})
}任务监控
记录任务执行
go
func RegisterJobLogger(scheduler *cron.Scheduler) {
scheduler.OnStart(func(job cron.Job) {
log.WithField("job", job.Name()).Info("Job started")
})
scheduler.OnComplete(func(job cron.Job, duration time.Duration, err error) {
fields := log.Fields{
"job": job.Name(),
"duration": duration.String(),
}
if err != nil {
fields["error"] = err.Error()
log.WithFields(fields).Error("Job failed")
} else {
log.WithFields(fields).Info("Job completed")
}
})
}