审查视图

pkg/application/node_task/node_task_service.go 10.7 KB
郑周 authored
1 2 3
package service

import (
4
	"fmt"
Your Name authored
5 6 7
	"strconv"
	"time"
tangxvhui authored
8 9
	"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/log"
郑周 authored
10 11
	"github.com/linmadan/egglib-go/core/application"
	"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/factory"
郑周 authored
12 13
	"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/staff_assess/command"
	"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/staff_assess/service"
郑周 authored
14
	"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/domain"
郑周 authored
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
	"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/utils"
)

type NodeTaskService struct {
}

func NewNodeTaskService() *NodeTaskService {
	newRoleService := &NodeTaskService{}
	return newRoleService
}

// SendEvaluationNode 发送评估环节
func (rs *NodeTaskService) SendEvaluationNode() error {
	transactionContext, err := factory.StartTransaction()
	if err != nil {
		return err
	}
	defer func() {
		transactionContext.RollbackTransaction()
34
郑周 authored
35
		if err := recover(); err != nil {
36
			log.Logger.Error(application.ThrowError(application.BUSINESS_ERROR, fmt.Sprintf("定时发送评估任务[基础查询]异常:%s", err)).Error())
郑周 authored
37
		}
郑周 authored
38 39
	}()
	taskRepository := factory.CreateNodeTaskRepository(map[string]interface{}{"transactionContext": transactionContext})
40
	tasks, err := taskRepository.Find(map[string]interface{}{"lessNextSentAt": time.Now().Local()})
郑周 authored
41 42 43
	if err != nil {
		return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
	}
郑周 authored
44 45 46
	if len(tasks) == 0 {
		return nil
	}
郑周 authored
47
48 49
	projectMap := map[int64]*domain.EvaluationProject{}
	cycleMap := map[int64]*domain.EvaluationCycle{}
郑周 authored
50
	for i := range tasks {
51 52
		projectMap[tasks[i].ProjectId] = nil
		cycleMap[tasks[i].CycleId] = nil
郑周 authored
53 54 55
	}
	projectIds := make([]int64, 0)
	cycleIds := make([]int64, 0)
56
	for k := range projectMap {
郑周 authored
57 58
		projectIds = append(projectIds, k)
	}
59
	for k := range cycleMap {
郑周 authored
60 61 62
		cycleIds = append(cycleIds, k)
	}
63 64 65 66 67 68 69 70 71
	if len(projectIds) > 0 {
		projectRepository := factory.CreateEvaluationProjectRepository(map[string]interface{}{"transactionContext": transactionContext})
		_, projects, err := projectRepository.Find(map[string]interface{}{"ids": projectIds}, "template")
		if err != nil {
			return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
		}
		for i := range projects {
			projectMap[projects[i].Id] = projects[i]
		}
郑周 authored
72
	}
73 74 75 76 77 78 79 80 81 82 83 84
	if len(cycleIds) > 0 {
		cycleRepository := factory.CreateEvaluationCycleRepository(map[string]interface{}{"transactionContext": transactionContext})
		_, cycles, err := cycleRepository.Find(map[string]interface{}{"ids": cycleIds})
		if err != nil {
			return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
		}
		for i := range cycles {
			cycleMap[cycles[i].Id] = cycles[i]
		}
	}
	if err = transactionContext.CommitTransaction(); err != nil {
		return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
郑周 authored
85
	}
郑周 authored
86
87 88 89 90 91 92 93 94 95
	// 相同项目节点先聚合
	taskMap := map[int64][]*domain.NodeTask{}
	for i := range tasks {
		task := tasks[i]
		array, ok := taskMap[task.ProjectId]
		if !ok {
			array = make([]*domain.NodeTask, 0)
		}
		taskMap[task.ProjectId] = append(array, task)
郑周 authored
96
	}
97 98 99 100 101 102 103 104 105 106 107
	for k, v := range taskMap {
		project, ok := projectMap[k]
		if ok && project != nil {
			if err = rs.taskSend(project, v, cycleMap); err != nil {
				return err
			}
		} else {
			if err = rs.taskAbort(v); err != nil {
				return err
			}
		}
郑周 authored
108
	}
郑周 authored
109
110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211
	//staffAssessService := service.NewStaffAssessServeice()
	//now := time.Now().Local()
	//for i := range tasks {
	//	task := tasks[i]
	//	project, ok := projectMap[task.ProjectId] // 项目还存在
	//	if ok && project != nil {
	//		// 环节截止时间
	//		maxTime := task.TimeEnd.Local()
	//
	//		// 更新任务最后一次的发送时间(取当前时间)
	//		task.LastSentAt = &now
	//
	//		// 当前周起始时间和截止时间
	//		var cycleTimeStart = task.NextSentAt.Local()
	//		var cycleTimeEnd time.Time
	//
	//		// 下个周期起始时间
	//		nextTime := utils.NextTimeInc(cycleTimeStart, task.KpiCycle)
	//		// 超过截止时间
	//		if nextTime.After(maxTime) {
	//			task.NextSentAt = nil
	//		} else {
	//			task.NextSentAt = &nextTime
	//		}
	//
	//		// 周期的截至时间=下一个周期的开始时间-1秒(需求方要求提交数据时间延长到第二天8点30分截止)
	//		if task.NextSentAt == nil {
	//			//cycleTimeEnd = maxTime
	//			maxYear, maxMonth, maxDay := maxTime.Date()
	//			cycleTimeEnd = time.Date(maxYear, maxMonth, maxDay, 0, 0, 0, 0, time.Local)
	//			cycleTimeEnd = cycleTimeEnd.Add(24*time.Hour + 8*time.Hour + 30*time.Minute) // 注.延长8.5小时
	//		} else {
	//			//cycleTimeEnd = task.NextSentAt.Local().Add(-1 * time.Second) // 周期截至时间=下一个周期起始时间-1秒
	//			cycleTimeEnd = task.NextSentAt.Local().Add(8*time.Hour + 30*time.Minute) // 注.延长8.5小时
	//		}
	//
	//		// 格式化周期的起始和截止时间
	//		fmCycleStartTime := cycleTimeStart.Format("2006-01-02 15:04:05")
	//		fmCycleTimeEnd := cycleTimeEnd.Format("2006-01-02 15:04:05")
	//
	//		csat := &command.CreateStaffAssessTask{
	//			CompanyId:             int(project.CompanyId),
	//			EvaluationProjectId:   int(project.Id),
	//			EvaluationProjectName: project.Name,
	//			CycleId:               project.CycleId,
	//			StepList:              make([]command.AssessTaskStep, 0),
	//		}
	//
	//		// 周期名称
	//		if cycle, ok := cycleMap[project.CycleId]; ok {
	//			csat.CycleName = cycle.Name
	//		}
	//
	//		// 接收人
	//		csat.ExecutorId = make([]int, 0)
	//		for rIndex := range project.Recipients {
	//			vInt, _ := strconv.Atoi(project.Recipients[rIndex])
	//			csat.ExecutorId = append(csat.ExecutorId, vInt)
	//		}
	//
	//		csat.BeginTime = fmCycleStartTime
	//		csat.EndTime = fmCycleTimeEnd
	//		csat.StepList = append(csat.StepList, command.AssessTaskStep{
	//			SortBy:       task.NodeSort,
	//			LinkNodeId:   int(task.NodeId),
	//			LinkNodeName: task.NodeName,
	//			LinkNodeType: task.NodeType,
	//			BeginTime:    fmCycleStartTime,
	//			EndTime:      fmCycleTimeEnd,
	//		})
	//
	//		// 创建发送任务
	//		_, err := staffAssessService.CreateStaffAssessTask(transactionContext, csat)
	//		if err != nil {
	//			return application.ThrowError(application.INTERNAL_SERVER_ERROR, "创建发送任务"+err.Error())
	//		}
	//	} else {
	//		task.NextSentAt = nil // 项目不存在,取消周期任务发送
	//	}
	//
	//	task, err := taskRepository.Insert(task)
	//	if err != nil {
	//		return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
	//	}
	//}

	return nil

}

func (rs *NodeTaskService) taskSend(project *domain.EvaluationProject, tasks []*domain.NodeTask, cycleMap map[int64]*domain.EvaluationCycle) error {
	transactionContext, err := factory.StartTransaction()
	if err != nil {
		return err
	}
	defer func() {
		_ = transactionContext.RollbackTransaction()
		if err := recover(); err != nil {
			log.Logger.Error(application.ThrowError(application.BUSINESS_ERROR, fmt.Sprintf("定时发送评估任务异常:%s", err)).Error())
		}
	}()
	taskRepository := factory.CreateNodeTaskRepository(map[string]interface{}{"transactionContext": transactionContext})
212 213

	now := time.Now().Local()
214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232
	csat := &command.CreateStaffAssessTask{
		CompanyId:             int(project.CompanyId),
		EvaluationProjectId:   int(project.Id),
		EvaluationProjectName: project.Name,
		CycleId:               project.CycleId,
		StepList:              make([]command.AssessTaskStep, 0),
	}
	// 周期名称
	if cycle, ok := cycleMap[project.CycleId]; ok {
		csat.CycleName = cycle.Name
	}

	// 接收人
	csat.ExecutorId = make([]int, 0)
	for rIndex := range project.Recipients {
		vInt, _ := strconv.Atoi(project.Recipients[rIndex])
		csat.ExecutorId = append(csat.ExecutorId, vInt)
	}
郑周 authored
233 234
	for i := range tasks {
		task := tasks[i]
郑周 authored
235
236 237
		// 环节截止时间
		maxTime := task.TimeEnd.Local()
郑周 authored
238
239 240
		// 更新任务最后一次的发送时间(取当前时间)
		task.LastSentAt = &now
郑周 authored
241
242 243 244
		// 当前小周期范围[起始时间-截止时间]
		var cycleTimeStart = task.NextSentAt.Local()
		var cycleTimeEnd time.Time
郑周 authored
245
246 247 248 249 250 251 252 253 254 255 256 257 258
		// 下个周期起始时间
		nextTime := utils.NextTimeInc(cycleTimeStart, task.KpiCycle)
		// 超过截止时间
		if nextTime.After(maxTime) {
			task.NextSentAt = nil
		} else {
			task.NextSentAt = &nextTime
		}
		// 更新下个周期
		_, err = taskRepository.Insert(task)
		if err != nil {
			return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
		}
郑周 authored
259
260 261 262 263 264 265 266 267 268 269 270 271 272 273
		// 周期的截至时间=下一个周期的开始时间-1秒(需求方要求:提交数据时间延长到第二天8点30分截止)
		if task.NextSentAt == nil {
			//cycleTimeEnd = maxTime
			maxYear, maxMonth, maxDay := maxTime.Date()
			cycleTimeEnd = time.Date(maxYear, maxMonth, maxDay, 0, 0, 0, 0, time.Local)
			cycleTimeEnd = cycleTimeEnd.Add(24*time.Hour + 8*time.Hour + 30*time.Minute) // 注.延长8.5小时
		} else {
			//cycleTimeEnd = task.NextSentAt.Local().Add(-1 * time.Second) // 周期截至时间=下一个周期起始时间-1秒
			cycleTimeEnd = task.NextSentAt.Local().Add(8*time.Hour + 30*time.Minute) // 注.延长8.5小时
		}

		// 格式化周期的起始和截止时间
		fmCycleStartTime := cycleTimeStart.Format("2006-01-02 15:04:05")
		fmCycleTimeEnd := cycleTimeEnd.Format("2006-01-02 15:04:05")
郑周 authored
274
275 276
		// 格式化周期的起始和截止时间
		if len(csat.BeginTime) == 0 {
郑周 authored
277
			csat.BeginTime = fmCycleStartTime
278 279
		}
		if len(csat.EndTime) == 0 {
郑周 authored
280
			csat.EndTime = fmCycleTimeEnd
郑周 authored
281
		}
282 283 284 285 286 287 288 289 290
		csat.StepList = append(csat.StepList, command.AssessTaskStep{
			SortBy:       task.NodeSort,
			LinkNodeId:   int(task.NodeId),
			LinkNodeName: task.NodeName,
			LinkNodeType: task.NodeType,
			BeginTime:    fmCycleStartTime,
			EndTime:      fmCycleTimeEnd,
		})
	}
郑周 authored
291
292 293 294 295
	// 创建发送任务
	_, err = service.NewStaffAssessServeice().CreateStaffAssessTask(transactionContext, csat)
	if err != nil {
		return application.ThrowError(application.INTERNAL_SERVER_ERROR, "创建发送任务"+err.Error())
郑周 authored
296 297
	}
298
	if err = transactionContext.CommitTransaction(); err != nil {
郑周 authored
299 300 301 302
		return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
	}

	return nil
303
}
郑周 authored
304
305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326
// 节点任务中止
func (rs *NodeTaskService) taskAbort(tasks []*domain.NodeTask) error {
	transactionContext, err := factory.StartTransaction()
	if err != nil {
		return err
	}
	defer func() {
		transactionContext.RollbackTransaction()
	}()
	taskRepository := factory.CreateNodeTaskRepository(map[string]interface{}{"transactionContext": transactionContext})
	for i := range tasks {
		task := tasks[i]
		task.NextSentAt = nil // 项目不存在,取消周期任务发送
		_, err = taskRepository.Insert(task)
		if err != nil {
			return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
		}
	}
	if err = transactionContext.CommitTransaction(); err != nil {
		return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
	}
	return nil
郑周 authored
327
}