作者 郑周

1. 优化任务发送(组合所有任务节点后统一发送)

... ... @@ -33,143 +33,295 @@ func (rs *NodeTaskService) SendEvaluationNode() error {
transactionContext.RollbackTransaction()
if err := recover(); err != nil {
log.Logger.Error(application.ThrowError(application.BUSINESS_ERROR, fmt.Sprintf("定时发送评估任务异常:%s", err)).Error())
log.Logger.Error(application.ThrowError(application.BUSINESS_ERROR, fmt.Sprintf("定时发送评估任务[基础查询]异常:%s", err)).Error())
}
}()
taskRepository := factory.CreateNodeTaskRepository(map[string]interface{}{"transactionContext": transactionContext})
tasks, err := taskRepository.Find(map[string]interface{}{"now": time.Now().Local()})
tasks, err := taskRepository.Find(map[string]interface{}{"lessNextSentAt": time.Now().Local()})
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
if len(tasks) == 0 {
return nil
}
projectRepository := factory.CreateEvaluationProjectRepository(map[string]interface{}{"transactionContext": transactionContext})
cycleRepository := factory.CreateEvaluationCycleRepository(map[string]interface{}{"transactionContext": transactionContext})
projectIdsMap := map[int64]*domain.EvaluationProject{}
cycleIdsMap := map[int64]*domain.EvaluationCycle{}
projectMap := map[int64]*domain.EvaluationProject{}
cycleMap := map[int64]*domain.EvaluationCycle{}
for i := range tasks {
task := tasks[i]
projectIdsMap[task.ProjectId] = nil
cycleIdsMap[task.CycleId] = nil
projectMap[tasks[i].ProjectId] = nil
cycleMap[tasks[i].CycleId] = nil
}
projectIds := make([]int64, 0)
cycleIds := make([]int64, 0)
for k := range projectIdsMap {
for k := range projectMap {
projectIds = append(projectIds, k)
}
for k := range cycleIdsMap {
for k := range cycleMap {
cycleIds = append(cycleIds, k)
}
_, projects, err := projectRepository.Find(map[string]interface{}{"ids": projectIds}, "template")
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
if len(projectIds) > 0 {
projectRepository := factory.CreateEvaluationProjectRepository(map[string]interface{}{"transactionContext": transactionContext})
_, projects, err := projectRepository.Find(map[string]interface{}{"ids": projectIds}, "template")
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
for i := range projects {
projectMap[projects[i].Id] = projects[i]
}
}
_, cycles, err := cycleRepository.Find(map[string]interface{}{"ids": cycleIds})
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
if len(cycleIds) > 0 {
cycleRepository := factory.CreateEvaluationCycleRepository(map[string]interface{}{"transactionContext": transactionContext})
_, cycles, err := cycleRepository.Find(map[string]interface{}{"ids": cycleIds})
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
for i := range cycles {
cycleMap[cycles[i].Id] = cycles[i]
}
}
if err = transactionContext.CommitTransaction(); err != nil {
return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
for i := range projects {
projectIdsMap[projects[i].Id] = projects[i]
// 相同项目节点先聚合
taskMap := map[int64][]*domain.NodeTask{}
for i := range tasks {
task := tasks[i]
array, ok := taskMap[task.ProjectId]
if !ok {
array = make([]*domain.NodeTask, 0)
}
taskMap[task.ProjectId] = append(array, task)
}
for i := range cycles {
cycleIdsMap[cycles[i].Id] = cycles[i]
for k, v := range taskMap {
project, ok := projectMap[k]
if ok && project != nil {
if err = rs.taskSend(project, v, cycleMap); err != nil {
return err
}
} else {
if err = rs.taskAbort(v); err != nil {
return err
}
}
}
staffAssessService := service.NewStaffAssessServeice()
//staffAssessService := service.NewStaffAssessServeice()
//now := time.Now().Local()
//for i := range tasks {
// task := tasks[i]
// project, ok := projectMap[task.ProjectId] // 项目还存在
// if ok && project != nil {
// // 环节截止时间
// maxTime := task.TimeEnd.Local()
//
// // 更新任务最后一次的发送时间(取当前时间)
// task.LastSentAt = &now
//
// // 当前周起始时间和截止时间
// var cycleTimeStart = task.NextSentAt.Local()
// var cycleTimeEnd time.Time
//
// // 下个周期起始时间
// nextTime := utils.NextTimeInc(cycleTimeStart, task.KpiCycle)
// // 超过截止时间
// if nextTime.After(maxTime) {
// task.NextSentAt = nil
// } else {
// task.NextSentAt = &nextTime
// }
//
// // 周期的截至时间=下一个周期的开始时间-1秒(需求方要求提交数据时间延长到第二天8点30分截止)
// if task.NextSentAt == nil {
// //cycleTimeEnd = maxTime
// maxYear, maxMonth, maxDay := maxTime.Date()
// cycleTimeEnd = time.Date(maxYear, maxMonth, maxDay, 0, 0, 0, 0, time.Local)
// cycleTimeEnd = cycleTimeEnd.Add(24*time.Hour + 8*time.Hour + 30*time.Minute) // 注.延长8.5小时
// } else {
// //cycleTimeEnd = task.NextSentAt.Local().Add(-1 * time.Second) // 周期截至时间=下一个周期起始时间-1秒
// cycleTimeEnd = task.NextSentAt.Local().Add(8*time.Hour + 30*time.Minute) // 注.延长8.5小时
// }
//
// // 格式化周期的起始和截止时间
// fmCycleStartTime := cycleTimeStart.Format("2006-01-02 15:04:05")
// fmCycleTimeEnd := cycleTimeEnd.Format("2006-01-02 15:04:05")
//
// csat := &command.CreateStaffAssessTask{
// CompanyId: int(project.CompanyId),
// EvaluationProjectId: int(project.Id),
// EvaluationProjectName: project.Name,
// CycleId: project.CycleId,
// StepList: make([]command.AssessTaskStep, 0),
// }
//
// // 周期名称
// if cycle, ok := cycleMap[project.CycleId]; ok {
// csat.CycleName = cycle.Name
// }
//
// // 接收人
// csat.ExecutorId = make([]int, 0)
// for rIndex := range project.Recipients {
// vInt, _ := strconv.Atoi(project.Recipients[rIndex])
// csat.ExecutorId = append(csat.ExecutorId, vInt)
// }
//
// csat.BeginTime = fmCycleStartTime
// csat.EndTime = fmCycleTimeEnd
// csat.StepList = append(csat.StepList, command.AssessTaskStep{
// SortBy: task.NodeSort,
// LinkNodeId: int(task.NodeId),
// LinkNodeName: task.NodeName,
// LinkNodeType: task.NodeType,
// BeginTime: fmCycleStartTime,
// EndTime: fmCycleTimeEnd,
// })
//
// // 创建发送任务
// _, err := staffAssessService.CreateStaffAssessTask(transactionContext, csat)
// if err != nil {
// return application.ThrowError(application.INTERNAL_SERVER_ERROR, "创建发送任务"+err.Error())
// }
// } else {
// task.NextSentAt = nil // 项目不存在,取消周期任务发送
// }
//
// task, err := taskRepository.Insert(task)
// if err != nil {
// return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
// }
//}
return nil
}
func (rs *NodeTaskService) taskSend(project *domain.EvaluationProject, tasks []*domain.NodeTask, cycleMap map[int64]*domain.EvaluationCycle) error {
transactionContext, err := factory.StartTransaction()
if err != nil {
return err
}
defer func() {
_ = transactionContext.RollbackTransaction()
if err := recover(); err != nil {
log.Logger.Error(application.ThrowError(application.BUSINESS_ERROR, fmt.Sprintf("定时发送评估任务异常:%s", err)).Error())
}
}()
taskRepository := factory.CreateNodeTaskRepository(map[string]interface{}{"transactionContext": transactionContext})
now := time.Now().Local()
csat := &command.CreateStaffAssessTask{
CompanyId: int(project.CompanyId),
EvaluationProjectId: int(project.Id),
EvaluationProjectName: project.Name,
CycleId: project.CycleId,
StepList: make([]command.AssessTaskStep, 0),
}
// 周期名称
if cycle, ok := cycleMap[project.CycleId]; ok {
csat.CycleName = cycle.Name
}
// 接收人
csat.ExecutorId = make([]int, 0)
for rIndex := range project.Recipients {
vInt, _ := strconv.Atoi(project.Recipients[rIndex])
csat.ExecutorId = append(csat.ExecutorId, vInt)
}
for i := range tasks {
task := tasks[i]
project, ok := projectIdsMap[task.ProjectId] // 项目
if ok && project != nil {
// 环节截止时间
maxTime := task.TimeEnd.Local()
// 更新任务最后一次的发送时间(取当前时间)
task.LastSentAt = &now
// 当前周起始时间和截止时间
var cycleTimeStart = task.NextSentAt.Local()
var cycleTimeEnd time.Time
// 下个周期起始时间
nextTime := utils.NextTimeInc(cycleTimeStart, task.KpiCycle)
// 超过截止时间
if nextTime.After(maxTime) {
task.NextSentAt = nil
} else {
task.NextSentAt = &nextTime
}
// 周期的截至时间=下一个周期的开始时间-1秒(需求方要求提交数据时间延长到第二天8点30分截止)
if task.NextSentAt == nil {
//cycleTimeEnd = maxTime
maxYear, maxMonth, maxDay := maxTime.Date()
cycleTimeEnd = time.Date(maxYear, maxMonth, maxDay, 0, 0, 0, 0, time.Local)
cycleTimeEnd = cycleTimeEnd.Add(24*time.Hour + 8*time.Hour + 30*time.Minute) // 注.延长8.5小时
} else {
//cycleTimeEnd = task.NextSentAt.Local().Add(-1 * time.Second) // 周期截至时间=下一个周期起始时间-1秒
cycleTimeEnd = task.NextSentAt.Local().Add(8*time.Hour + 30*time.Minute) // 注.延长8.5小时
}
// 环节截止时间
maxTime := task.TimeEnd.Local()
// 格式化周期的起始和截止时间
fmCycleStartTime := cycleTimeStart.Format("2006-01-02 15:04:05")
fmCycleTimeEnd := cycleTimeEnd.Format("2006-01-02 15:04:05")
// 更新任务最后一次的发送时间(取当前时间)
task.LastSentAt = &now
csat := &command.CreateStaffAssessTask{
CompanyId: int(project.CompanyId),
EvaluationProjectId: int(project.Id),
EvaluationProjectName: project.Name,
CycleId: project.CycleId,
StepList: make([]command.AssessTaskStep, 0),
}
// 当前小周期范围[起始时间-截止时间]
var cycleTimeStart = task.NextSentAt.Local()
var cycleTimeEnd time.Time
// 周期名称
if cycle, ok := cycleIdsMap[project.CycleId]; ok {
csat.CycleName = cycle.Name
}
// 下个周期起始时间
nextTime := utils.NextTimeInc(cycleTimeStart, task.KpiCycle)
// 超过截止时间
if nextTime.After(maxTime) {
task.NextSentAt = nil
} else {
task.NextSentAt = &nextTime
}
// 更新下个周期
_, err = taskRepository.Insert(task)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
// 接收人
csat.ExecutorId = make([]int, 0)
for rIndex := range project.Recipients {
vInt, _ := strconv.Atoi(project.Recipients[rIndex])
csat.ExecutorId = append(csat.ExecutorId, vInt)
}
// 周期的截至时间=下一个周期的开始时间-1秒(需求方要求:提交数据时间延长到第二天8点30分截止)
if task.NextSentAt == nil {
//cycleTimeEnd = maxTime
maxYear, maxMonth, maxDay := maxTime.Date()
cycleTimeEnd = time.Date(maxYear, maxMonth, maxDay, 0, 0, 0, 0, time.Local)
cycleTimeEnd = cycleTimeEnd.Add(24*time.Hour + 8*time.Hour + 30*time.Minute) // 注.延长8.5小时
} else {
//cycleTimeEnd = task.NextSentAt.Local().Add(-1 * time.Second) // 周期截至时间=下一个周期起始时间-1秒
cycleTimeEnd = task.NextSentAt.Local().Add(8*time.Hour + 30*time.Minute) // 注.延长8.5小时
}
// 格式化周期的起始和截止时间
fmCycleStartTime := cycleTimeStart.Format("2006-01-02 15:04:05")
fmCycleTimeEnd := cycleTimeEnd.Format("2006-01-02 15:04:05")
// 格式化周期的起始和截止时间
if len(csat.BeginTime) == 0 {
csat.BeginTime = fmCycleStartTime
}
if len(csat.EndTime) == 0 {
csat.EndTime = fmCycleTimeEnd
csat.StepList = append(csat.StepList, command.AssessTaskStep{
SortBy: task.NodeSort,
LinkNodeId: int(task.NodeId),
LinkNodeName: task.NodeName,
LinkNodeType: task.NodeType,
BeginTime: fmCycleStartTime,
EndTime: fmCycleTimeEnd,
})
// 创建发送任务
_, err := staffAssessService.CreateStaffAssessTask(transactionContext, csat)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, "创建发送任务"+err.Error())
}
} else {
task.NextSentAt = nil // 项目不存在,取消周期任务发送
}
csat.StepList = append(csat.StepList, command.AssessTaskStep{
SortBy: task.NodeSort,
LinkNodeId: int(task.NodeId),
LinkNodeName: task.NodeName,
LinkNodeType: task.NodeType,
BeginTime: fmCycleStartTime,
EndTime: fmCycleTimeEnd,
})
}
task, err := taskRepository.Insert(task)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
// 创建发送任务
_, err = service.NewStaffAssessServeice().CreateStaffAssessTask(transactionContext, csat)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, "创建发送任务"+err.Error())
}
if err := transactionContext.CommitTransaction(); err != nil {
if err = transactionContext.CommitTransaction(); err != nil {
return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
return nil
}
// 节点任务中止
func (rs *NodeTaskService) taskAbort(tasks []*domain.NodeTask) error {
transactionContext, err := factory.StartTransaction()
if err != nil {
return err
}
defer func() {
transactionContext.RollbackTransaction()
}()
taskRepository := factory.CreateNodeTaskRepository(map[string]interface{}{"transactionContext": transactionContext})
for i := range tasks {
task := tasks[i]
task.NextSentAt = nil // 项目不存在,取消周期任务发送
_, err = taskRepository.Insert(task)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
}
if err = transactionContext.CommitTransaction(); err != nil {
return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
return nil
}
... ...
... ... @@ -131,7 +131,7 @@ func (repo *NodeTaskRepository) Find(queryOptions map[string]interface{}) ([]*do
var m []*models.NodeTask
query := tx.Model(&m).Where("deleted_at isnull")
if v, ok := queryOptions["now"].(time.Time); ok {
if v, ok := queryOptions["lessNextSentAt"].(time.Time); ok {
query.Where("next_sent_at <= ?", v)
}
... ...