package service import ( "fmt" "strconv" "time" "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/log" "github.com/linmadan/egglib-go/core/application" "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/factory" "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/staff_assess/command" "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/staff_assess/service" "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/domain" "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/utils" ) type NodeTaskService struct { } func NewNodeTaskService() *NodeTaskService { newRoleService := &NodeTaskService{} return newRoleService } // SendEvaluationNode 发送评估环节 func (rs *NodeTaskService) SendEvaluationNode() error { transactionContext, err := factory.StartTransaction() if err != nil { return err } defer func() { transactionContext.RollbackTransaction() if err := recover(); err != nil { log.Logger.Error(application.ThrowError(application.BUSINESS_ERROR, fmt.Sprintf("定时发送评估任务[基础查询]异常:%s", err)).Error()) } }() taskRepository := factory.CreateNodeTaskRepository(map[string]interface{}{"transactionContext": transactionContext}) tasks, err := taskRepository.Find(map[string]interface{}{"lessNextSentAt": time.Now().Local()}) if err != nil { return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error()) } if len(tasks) == 0 { return nil } projectMap := map[int64]*domain.EvaluationProject{} cycleMap := map[int64]*domain.EvaluationCycle{} for i := range tasks { projectMap[tasks[i].ProjectId] = nil cycleMap[tasks[i].CycleId] = nil } projectIds := make([]int64, 0) cycleIds := make([]int64, 0) for k := range projectMap { projectIds = append(projectIds, k) } for k := range cycleMap { cycleIds = append(cycleIds, k) } if len(projectIds) > 0 { projectRepository := factory.CreateEvaluationProjectRepository(map[string]interface{}{"transactionContext": transactionContext}) _, projects, err := projectRepository.Find(map[string]interface{}{"ids": projectIds}, "template") if err != nil { return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error()) } for i := range projects { projectMap[projects[i].Id] = projects[i] } } if len(cycleIds) > 0 { cycleRepository := factory.CreateEvaluationCycleRepository(map[string]interface{}{"transactionContext": transactionContext}) _, cycles, err := cycleRepository.Find(map[string]interface{}{"ids": cycleIds}) if err != nil { return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error()) } for i := range cycles { cycleMap[cycles[i].Id] = cycles[i] } } if err = transactionContext.CommitTransaction(); err != nil { return application.ThrowError(application.TRANSACTION_ERROR, err.Error()) } // 相同项目节点先聚合 taskMap := map[int64][]*domain.NodeTask{} for i := range tasks { task := tasks[i] array, ok := taskMap[task.ProjectId] if !ok { array = make([]*domain.NodeTask, 0) } taskMap[task.ProjectId] = append(array, task) } for k, v := range taskMap { project, ok := projectMap[k] if ok && project != nil { if err = rs.taskSend(project, v, cycleMap); err != nil { return err } } else { if err = rs.taskAbort(v); err != nil { return err } } } //staffAssessService := service.NewStaffAssessServeice() //now := time.Now().Local() //for i := range tasks { // task := tasks[i] // project, ok := projectMap[task.ProjectId] // 项目还存在 // if ok && project != nil { // // 环节截止时间 // maxTime := task.TimeEnd.Local() // // // 更新任务最后一次的发送时间(取当前时间) // task.LastSentAt = &now // // // 当前周起始时间和截止时间 // var cycleTimeStart = task.NextSentAt.Local() // var cycleTimeEnd time.Time // // // 下个周期起始时间 // nextTime := utils.NextTimeInc(cycleTimeStart, task.KpiCycle) // // 超过截止时间 // if nextTime.After(maxTime) { // task.NextSentAt = nil // } else { // task.NextSentAt = &nextTime // } // // // 周期的截至时间=下一个周期的开始时间-1秒(需求方要求提交数据时间延长到第二天8点30分截止) // if task.NextSentAt == nil { // //cycleTimeEnd = maxTime // maxYear, maxMonth, maxDay := maxTime.Date() // cycleTimeEnd = time.Date(maxYear, maxMonth, maxDay, 0, 0, 0, 0, time.Local) // cycleTimeEnd = cycleTimeEnd.Add(24*time.Hour + 8*time.Hour + 30*time.Minute) // 注.延长8.5小时 // } else { // //cycleTimeEnd = task.NextSentAt.Local().Add(-1 * time.Second) // 周期截至时间=下一个周期起始时间-1秒 // cycleTimeEnd = task.NextSentAt.Local().Add(8*time.Hour + 30*time.Minute) // 注.延长8.5小时 // } // // // 格式化周期的起始和截止时间 // fmCycleStartTime := cycleTimeStart.Format("2006-01-02 15:04:05") // fmCycleTimeEnd := cycleTimeEnd.Format("2006-01-02 15:04:05") // // csat := &command.CreateStaffAssessTask{ // CompanyId: int(project.CompanyId), // EvaluationProjectId: int(project.Id), // EvaluationProjectName: project.Name, // CycleId: project.CycleId, // StepList: make([]command.AssessTaskStep, 0), // } // // // 周期名称 // if cycle, ok := cycleMap[project.CycleId]; ok { // csat.CycleName = cycle.Name // } // // // 接收人 // csat.ExecutorId = make([]int, 0) // for rIndex := range project.Recipients { // vInt, _ := strconv.Atoi(project.Recipients[rIndex]) // csat.ExecutorId = append(csat.ExecutorId, vInt) // } // // csat.BeginTime = fmCycleStartTime // csat.EndTime = fmCycleTimeEnd // csat.StepList = append(csat.StepList, command.AssessTaskStep{ // SortBy: task.NodeSort, // LinkNodeId: int(task.NodeId), // LinkNodeName: task.NodeName, // LinkNodeType: task.NodeType, // BeginTime: fmCycleStartTime, // EndTime: fmCycleTimeEnd, // }) // // // 创建发送任务 // _, err := staffAssessService.CreateStaffAssessTask(transactionContext, csat) // if err != nil { // return application.ThrowError(application.INTERNAL_SERVER_ERROR, "创建发送任务"+err.Error()) // } // } else { // task.NextSentAt = nil // 项目不存在,取消周期任务发送 // } // // task, err := taskRepository.Insert(task) // if err != nil { // return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error()) // } //} return nil } func (rs *NodeTaskService) taskSend(project *domain.EvaluationProject, tasks []*domain.NodeTask, cycleMap map[int64]*domain.EvaluationCycle) error { transactionContext, err := factory.StartTransaction() if err != nil { return err } defer func() { _ = transactionContext.RollbackTransaction() if err := recover(); err != nil { log.Logger.Error(application.ThrowError(application.BUSINESS_ERROR, fmt.Sprintf("定时发送评估任务异常:%s", err)).Error()) } }() taskRepository := factory.CreateNodeTaskRepository(map[string]interface{}{"transactionContext": transactionContext}) now := time.Now().Local() csat := &command.CreateStaffAssessTask{ CompanyId: int(project.CompanyId), EvaluationProjectId: int(project.Id), EvaluationProjectName: project.Name, CycleId: project.CycleId, StepList: make([]command.AssessTaskStep, 0), } // 周期名称 if cycle, ok := cycleMap[project.CycleId]; ok { csat.CycleName = cycle.Name } // 接收人 csat.ExecutorId = make([]int, 0) for rIndex := range project.Recipients { vInt, _ := strconv.Atoi(project.Recipients[rIndex]) csat.ExecutorId = append(csat.ExecutorId, vInt) } for i := range tasks { task := tasks[i] // 环节截止时间 maxTime := task.TimeEnd.Local() // 更新任务最后一次的发送时间(取当前时间) task.LastSentAt = &now // 当前小周期范围[起始时间-截止时间] var cycleTimeStart = task.NextSentAt.Local() var cycleTimeEnd time.Time // 下个周期起始时间 nextTime := utils.NextTimeInc(cycleTimeStart, task.KpiCycle) // 超过截止时间 if nextTime.After(maxTime) { task.NextSentAt = nil } else { task.NextSentAt = &nextTime } // 更新下个周期 _, err = taskRepository.Insert(task) if err != nil { return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error()) } // 周期的截至时间=下一个周期的开始时间-1秒(需求方要求:提交数据时间延长到第二天8点30分截止) if task.NextSentAt == nil { //cycleTimeEnd = maxTime maxYear, maxMonth, maxDay := maxTime.Date() cycleTimeEnd = time.Date(maxYear, maxMonth, maxDay, 0, 0, 0, 0, time.Local) cycleTimeEnd = cycleTimeEnd.Add(24*time.Hour + 8*time.Hour + 30*time.Minute) // 注.延长8.5小时 } else { //cycleTimeEnd = task.NextSentAt.Local().Add(-1 * time.Second) // 周期截至时间=下一个周期起始时间-1秒 cycleTimeEnd = task.NextSentAt.Local().Add(8*time.Hour + 30*time.Minute) // 注.延长8.5小时 } // 格式化周期的起始和截止时间 fmCycleStartTime := cycleTimeStart.Format("2006-01-02 15:04:05") fmCycleTimeEnd := cycleTimeEnd.Format("2006-01-02 15:04:05") // 格式化周期的起始和截止时间 if len(csat.BeginTime) == 0 { csat.BeginTime = fmCycleStartTime } if len(csat.EndTime) == 0 { csat.EndTime = fmCycleTimeEnd } csat.StepList = append(csat.StepList, command.AssessTaskStep{ SortBy: task.NodeSort, LinkNodeId: int(task.NodeId), LinkNodeName: task.NodeName, LinkNodeType: task.NodeType, BeginTime: fmCycleStartTime, EndTime: fmCycleTimeEnd, }) } // 创建发送任务 _, err = service.NewStaffAssessServeice().CreateStaffAssessTask(transactionContext, csat) if err != nil { return application.ThrowError(application.INTERNAL_SERVER_ERROR, "创建发送任务"+err.Error()) } if err = transactionContext.CommitTransaction(); err != nil { return application.ThrowError(application.TRANSACTION_ERROR, err.Error()) } return nil } // 节点任务中止 func (rs *NodeTaskService) taskAbort(tasks []*domain.NodeTask) error { transactionContext, err := factory.StartTransaction() if err != nil { return err } defer func() { transactionContext.RollbackTransaction() }() taskRepository := factory.CreateNodeTaskRepository(map[string]interface{}{"transactionContext": transactionContext}) for i := range tasks { task := tasks[i] task.NextSentAt = nil // 项目不存在,取消周期任务发送 _, err = taskRepository.Insert(task) if err != nil { return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error()) } } if err = transactionContext.CommitTransaction(); err != nil { return application.ThrowError(application.TRANSACTION_ERROR, err.Error()) } return nil }