作者 tangxvhui
@@ -33,66 +33,213 @@ func (rs *NodeTaskService) SendEvaluationNode() error { @@ -33,66 +33,213 @@ func (rs *NodeTaskService) SendEvaluationNode() error {
33 transactionContext.RollbackTransaction() 33 transactionContext.RollbackTransaction()
34 34
35 if err := recover(); err != nil { 35 if err := recover(); err != nil {
36 - log.Logger.Error(application.ThrowError(application.BUSINESS_ERROR, fmt.Sprintf("定时发送评估任务异常:%s", err)).Error()) 36 + log.Logger.Error(application.ThrowError(application.BUSINESS_ERROR, fmt.Sprintf("定时发送评估任务[基础查询]异常:%s", err)).Error())
37 } 37 }
38 }() 38 }()
39 taskRepository := factory.CreateNodeTaskRepository(map[string]interface{}{"transactionContext": transactionContext}) 39 taskRepository := factory.CreateNodeTaskRepository(map[string]interface{}{"transactionContext": transactionContext})
40 - tasks, err := taskRepository.Find(map[string]interface{}{"now": time.Now().Local()}) 40 + tasks, err := taskRepository.Find(map[string]interface{}{"lessNextSentAt": time.Now().Local()})
41 if err != nil { 41 if err != nil {
42 return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error()) 42 return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
43 } 43 }
44 if len(tasks) == 0 { 44 if len(tasks) == 0 {
45 return nil 45 return nil
46 } 46 }
47 - projectRepository := factory.CreateEvaluationProjectRepository(map[string]interface{}{"transactionContext": transactionContext})  
48 - cycleRepository := factory.CreateEvaluationCycleRepository(map[string]interface{}{"transactionContext": transactionContext})  
49 47
50 - projectIdsMap := map[int64]*domain.EvaluationProject{}  
51 - cycleIdsMap := map[int64]*domain.EvaluationCycle{} 48 + projectMap := map[int64]*domain.EvaluationProject{}
  49 + cycleMap := map[int64]*domain.EvaluationCycle{}
52 for i := range tasks { 50 for i := range tasks {
53 - task := tasks[i]  
54 - projectIdsMap[task.ProjectId] = nil  
55 - cycleIdsMap[task.CycleId] = nil 51 + projectMap[tasks[i].ProjectId] = nil
  52 + cycleMap[tasks[i].CycleId] = nil
56 } 53 }
57 projectIds := make([]int64, 0) 54 projectIds := make([]int64, 0)
58 cycleIds := make([]int64, 0) 55 cycleIds := make([]int64, 0)
59 - for k := range projectIdsMap { 56 + for k := range projectMap {
60 projectIds = append(projectIds, k) 57 projectIds = append(projectIds, k)
61 } 58 }
62 - for k := range cycleIdsMap { 59 + for k := range cycleMap {
63 cycleIds = append(cycleIds, k) 60 cycleIds = append(cycleIds, k)
64 } 61 }
65 62
  63 + if len(projectIds) > 0 {
  64 + projectRepository := factory.CreateEvaluationProjectRepository(map[string]interface{}{"transactionContext": transactionContext})
66 _, projects, err := projectRepository.Find(map[string]interface{}{"ids": projectIds}, "template") 65 _, projects, err := projectRepository.Find(map[string]interface{}{"ids": projectIds}, "template")
67 if err != nil { 66 if err != nil {
68 return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error()) 67 return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
69 } 68 }
  69 + for i := range projects {
  70 + projectMap[projects[i].Id] = projects[i]
  71 + }
  72 + }
  73 + if len(cycleIds) > 0 {
  74 + cycleRepository := factory.CreateEvaluationCycleRepository(map[string]interface{}{"transactionContext": transactionContext})
70 _, cycles, err := cycleRepository.Find(map[string]interface{}{"ids": cycleIds}) 75 _, cycles, err := cycleRepository.Find(map[string]interface{}{"ids": cycleIds})
71 if err != nil { 76 if err != nil {
72 return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error()) 77 return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
73 } 78 }
  79 + for i := range cycles {
  80 + cycleMap[cycles[i].Id] = cycles[i]
  81 + }
  82 + }
  83 + if err = transactionContext.CommitTransaction(); err != nil {
  84 + return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
  85 + }
74 86
75 - for i := range projects {  
76 - projectIdsMap[projects[i].Id] = projects[i] 87 + // 相同项目节点先聚合
  88 + taskMap := map[int64][]*domain.NodeTask{}
  89 + for i := range tasks {
  90 + task := tasks[i]
  91 + array, ok := taskMap[task.ProjectId]
  92 + if !ok {
  93 + array = make([]*domain.NodeTask, 0)
77 } 94 }
78 - for i := range cycles {  
79 - cycleIdsMap[cycles[i].Id] = cycles[i] 95 + taskMap[task.ProjectId] = append(array, task)
  96 + }
  97 + for k, v := range taskMap {
  98 + project, ok := projectMap[k]
  99 + if ok && project != nil {
  100 + if err = rs.taskSend(project, v, cycleMap); err != nil {
  101 + return err
80 } 102 }
  103 + } else {
  104 + if err = rs.taskAbort(v); err != nil {
  105 + return err
  106 + }
  107 + }
  108 + }
  109 +
  110 + //staffAssessService := service.NewStaffAssessServeice()
  111 + //now := time.Now().Local()
  112 + //for i := range tasks {
  113 + // task := tasks[i]
  114 + // project, ok := projectMap[task.ProjectId] // 项目还存在
  115 + // if ok && project != nil {
  116 + // // 环节截止时间
  117 + // maxTime := task.TimeEnd.Local()
  118 + //
  119 + // // 更新任务最后一次的发送时间(取当前时间)
  120 + // task.LastSentAt = &now
  121 + //
  122 + // // 当前周起始时间和截止时间
  123 + // var cycleTimeStart = task.NextSentAt.Local()
  124 + // var cycleTimeEnd time.Time
  125 + //
  126 + // // 下个周期起始时间
  127 + // nextTime := utils.NextTimeInc(cycleTimeStart, task.KpiCycle)
  128 + // // 超过截止时间
  129 + // if nextTime.After(maxTime) {
  130 + // task.NextSentAt = nil
  131 + // } else {
  132 + // task.NextSentAt = &nextTime
  133 + // }
  134 + //
  135 + // // 周期的截至时间=下一个周期的开始时间-1秒(需求方要求提交数据时间延长到第二天8点30分截止)
  136 + // if task.NextSentAt == nil {
  137 + // //cycleTimeEnd = maxTime
  138 + // maxYear, maxMonth, maxDay := maxTime.Date()
  139 + // cycleTimeEnd = time.Date(maxYear, maxMonth, maxDay, 0, 0, 0, 0, time.Local)
  140 + // cycleTimeEnd = cycleTimeEnd.Add(24*time.Hour + 8*time.Hour + 30*time.Minute) // 注.延长8.5小时
  141 + // } else {
  142 + // //cycleTimeEnd = task.NextSentAt.Local().Add(-1 * time.Second) // 周期截至时间=下一个周期起始时间-1秒
  143 + // cycleTimeEnd = task.NextSentAt.Local().Add(8*time.Hour + 30*time.Minute) // 注.延长8.5小时
  144 + // }
  145 + //
  146 + // // 格式化周期的起始和截止时间
  147 + // fmCycleStartTime := cycleTimeStart.Format("2006-01-02 15:04:05")
  148 + // fmCycleTimeEnd := cycleTimeEnd.Format("2006-01-02 15:04:05")
  149 + //
  150 + // csat := &command.CreateStaffAssessTask{
  151 + // CompanyId: int(project.CompanyId),
  152 + // EvaluationProjectId: int(project.Id),
  153 + // EvaluationProjectName: project.Name,
  154 + // CycleId: project.CycleId,
  155 + // StepList: make([]command.AssessTaskStep, 0),
  156 + // }
  157 + //
  158 + // // 周期名称
  159 + // if cycle, ok := cycleMap[project.CycleId]; ok {
  160 + // csat.CycleName = cycle.Name
  161 + // }
  162 + //
  163 + // // 接收人
  164 + // csat.ExecutorId = make([]int, 0)
  165 + // for rIndex := range project.Recipients {
  166 + // vInt, _ := strconv.Atoi(project.Recipients[rIndex])
  167 + // csat.ExecutorId = append(csat.ExecutorId, vInt)
  168 + // }
  169 + //
  170 + // csat.BeginTime = fmCycleStartTime
  171 + // csat.EndTime = fmCycleTimeEnd
  172 + // csat.StepList = append(csat.StepList, command.AssessTaskStep{
  173 + // SortBy: task.NodeSort,
  174 + // LinkNodeId: int(task.NodeId),
  175 + // LinkNodeName: task.NodeName,
  176 + // LinkNodeType: task.NodeType,
  177 + // BeginTime: fmCycleStartTime,
  178 + // EndTime: fmCycleTimeEnd,
  179 + // })
  180 + //
  181 + // // 创建发送任务
  182 + // _, err := staffAssessService.CreateStaffAssessTask(transactionContext, csat)
  183 + // if err != nil {
  184 + // return application.ThrowError(application.INTERNAL_SERVER_ERROR, "创建发送任务"+err.Error())
  185 + // }
  186 + // } else {
  187 + // task.NextSentAt = nil // 项目不存在,取消周期任务发送
  188 + // }
  189 + //
  190 + // task, err := taskRepository.Insert(task)
  191 + // if err != nil {
  192 + // return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
  193 + // }
  194 + //}
81 195
82 - staffAssessService := service.NewStaffAssessServeice() 196 + return nil
  197 +
  198 +}
  199 +
  200 +func (rs *NodeTaskService) taskSend(project *domain.EvaluationProject, tasks []*domain.NodeTask, cycleMap map[int64]*domain.EvaluationCycle) error {
  201 + transactionContext, err := factory.StartTransaction()
  202 + if err != nil {
  203 + return err
  204 + }
  205 + defer func() {
  206 + _ = transactionContext.RollbackTransaction()
  207 + if err := recover(); err != nil {
  208 + log.Logger.Error(application.ThrowError(application.BUSINESS_ERROR, fmt.Sprintf("定时发送评估任务异常:%s", err)).Error())
  209 + }
  210 + }()
  211 + taskRepository := factory.CreateNodeTaskRepository(map[string]interface{}{"transactionContext": transactionContext})
83 212
84 now := time.Now().Local() 213 now := time.Now().Local()
  214 + csat := &command.CreateStaffAssessTask{
  215 + CompanyId: int(project.CompanyId),
  216 + EvaluationProjectId: int(project.Id),
  217 + EvaluationProjectName: project.Name,
  218 + CycleId: project.CycleId,
  219 + StepList: make([]command.AssessTaskStep, 0),
  220 + }
  221 + // 周期名称
  222 + if cycle, ok := cycleMap[project.CycleId]; ok {
  223 + csat.CycleName = cycle.Name
  224 + }
  225 +
  226 + // 接收人
  227 + csat.ExecutorId = make([]int, 0)
  228 + for rIndex := range project.Recipients {
  229 + vInt, _ := strconv.Atoi(project.Recipients[rIndex])
  230 + csat.ExecutorId = append(csat.ExecutorId, vInt)
  231 + }
  232 +
85 for i := range tasks { 233 for i := range tasks {
86 task := tasks[i] 234 task := tasks[i]
87 - project, ok := projectIdsMap[task.ProjectId] // 项目  
88 - if ok && project != nil { 235 +
89 // 环节截止时间 236 // 环节截止时间
90 maxTime := task.TimeEnd.Local() 237 maxTime := task.TimeEnd.Local()
91 238
92 // 更新任务最后一次的发送时间(取当前时间) 239 // 更新任务最后一次的发送时间(取当前时间)
93 task.LastSentAt = &now 240 task.LastSentAt = &now
94 241
95 - // 当前周起始时间和截止时间 242 + // 当前小周期范围[起始时间-截止时间]
96 var cycleTimeStart = task.NextSentAt.Local() 243 var cycleTimeStart = task.NextSentAt.Local()
97 var cycleTimeEnd time.Time 244 var cycleTimeEnd time.Time
98 245
@@ -104,8 +251,13 @@ func (rs *NodeTaskService) SendEvaluationNode() error { @@ -104,8 +251,13 @@ func (rs *NodeTaskService) SendEvaluationNode() error {
104 } else { 251 } else {
105 task.NextSentAt = &nextTime 252 task.NextSentAt = &nextTime
106 } 253 }
  254 + // 更新下个周期
  255 + _, err = taskRepository.Insert(task)
  256 + if err != nil {
  257 + return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
  258 + }
107 259
108 - // 周期的截至时间=下一个周期的开始时间-1秒(需求方要求提交数据时间延长到第二天8点30分截止) 260 + // 周期的截至时间=下一个周期的开始时间-1秒(需求方要求:提交数据时间延长到第二天8点30分截止)
109 if task.NextSentAt == nil { 261 if task.NextSentAt == nil {
110 //cycleTimeEnd = maxTime 262 //cycleTimeEnd = maxTime
111 maxYear, maxMonth, maxDay := maxTime.Date() 263 maxYear, maxMonth, maxDay := maxTime.Date()
@@ -120,28 +272,13 @@ func (rs *NodeTaskService) SendEvaluationNode() error { @@ -120,28 +272,13 @@ func (rs *NodeTaskService) SendEvaluationNode() error {
120 fmCycleStartTime := cycleTimeStart.Format("2006-01-02 15:04:05") 272 fmCycleStartTime := cycleTimeStart.Format("2006-01-02 15:04:05")
121 fmCycleTimeEnd := cycleTimeEnd.Format("2006-01-02 15:04:05") 273 fmCycleTimeEnd := cycleTimeEnd.Format("2006-01-02 15:04:05")
122 274
123 - csat := &command.CreateStaffAssessTask{  
124 - CompanyId: int(project.CompanyId),  
125 - EvaluationProjectId: int(project.Id),  
126 - EvaluationProjectName: project.Name,  
127 - CycleId: project.CycleId,  
128 - StepList: make([]command.AssessTaskStep, 0),  
129 - }  
130 -  
131 - // 周期名称  
132 - if cycle, ok := cycleIdsMap[project.CycleId]; ok {  
133 - csat.CycleName = cycle.Name  
134 - }  
135 -  
136 - // 接收人  
137 - csat.ExecutorId = make([]int, 0)  
138 - for rIndex := range project.Recipients {  
139 - vInt, _ := strconv.Atoi(project.Recipients[rIndex])  
140 - csat.ExecutorId = append(csat.ExecutorId, vInt)  
141 - }  
142 - 275 + // 格式化周期的起始和截止时间
  276 + if len(csat.BeginTime) == 0 {
143 csat.BeginTime = fmCycleStartTime 277 csat.BeginTime = fmCycleStartTime
  278 + }
  279 + if len(csat.EndTime) == 0 {
144 csat.EndTime = fmCycleTimeEnd 280 csat.EndTime = fmCycleTimeEnd
  281 + }
145 csat.StepList = append(csat.StepList, command.AssessTaskStep{ 282 csat.StepList = append(csat.StepList, command.AssessTaskStep{
146 SortBy: task.NodeSort, 283 SortBy: task.NodeSort,
147 LinkNodeId: int(task.NodeId), 284 LinkNodeId: int(task.NodeId),
@@ -150,26 +287,41 @@ func (rs *NodeTaskService) SendEvaluationNode() error { @@ -150,26 +287,41 @@ func (rs *NodeTaskService) SendEvaluationNode() error {
150 BeginTime: fmCycleStartTime, 287 BeginTime: fmCycleStartTime,
151 EndTime: fmCycleTimeEnd, 288 EndTime: fmCycleTimeEnd,
152 }) 289 })
  290 + }
153 291
154 // 创建发送任务 292 // 创建发送任务
155 - _, err := staffAssessService.CreateStaffAssessTask(transactionContext, csat) 293 + _, err = service.NewStaffAssessServeice().CreateStaffAssessTask(transactionContext, csat)
156 if err != nil { 294 if err != nil {
157 return application.ThrowError(application.INTERNAL_SERVER_ERROR, "创建发送任务"+err.Error()) 295 return application.ThrowError(application.INTERNAL_SERVER_ERROR, "创建发送任务"+err.Error())
158 } 296 }
159 - } else {  
160 - task.NextSentAt = nil // 项目不存在,取消周期任务发送 297 +
  298 + if err = transactionContext.CommitTransaction(); err != nil {
  299 + return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
161 } 300 }
162 301
163 - task, err := taskRepository.Insert(task) 302 + return nil
  303 +}
  304 +
  305 +// 节点任务中止
  306 +func (rs *NodeTaskService) taskAbort(tasks []*domain.NodeTask) error {
  307 + transactionContext, err := factory.StartTransaction()
  308 + if err != nil {
  309 + return err
  310 + }
  311 + defer func() {
  312 + transactionContext.RollbackTransaction()
  313 + }()
  314 + taskRepository := factory.CreateNodeTaskRepository(map[string]interface{}{"transactionContext": transactionContext})
  315 + for i := range tasks {
  316 + task := tasks[i]
  317 + task.NextSentAt = nil // 项目不存在,取消周期任务发送
  318 + _, err = taskRepository.Insert(task)
164 if err != nil { 319 if err != nil {
165 return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error()) 320 return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
166 } 321 }
167 } 322 }
168 -  
169 - if err := transactionContext.CommitTransaction(); err != nil { 323 + if err = transactionContext.CommitTransaction(); err != nil {
170 return application.ThrowError(application.TRANSACTION_ERROR, err.Error()) 324 return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
171 } 325 }
172 -  
173 return nil 326 return nil
174 -  
175 } 327 }
@@ -131,7 +131,7 @@ func (repo *NodeTaskRepository) Find(queryOptions map[string]interface{}) ([]*do @@ -131,7 +131,7 @@ func (repo *NodeTaskRepository) Find(queryOptions map[string]interface{}) ([]*do
131 var m []*models.NodeTask 131 var m []*models.NodeTask
132 query := tx.Model(&m).Where("deleted_at isnull") 132 query := tx.Model(&m).Where("deleted_at isnull")
133 133
134 - if v, ok := queryOptions["now"].(time.Time); ok { 134 + if v, ok := queryOptions["lessNextSentAt"].(time.Time); ok {
135 query.Where("next_sent_at <= ?", v) 135 query.Where("next_sent_at <= ?", v)
136 } 136 }
137 137