node_task_service.go 10.7 KB
package service

import (
	"fmt"
	"strconv"
	"time"

	"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/log"

	"github.com/linmadan/egglib-go/core/application"
	"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/factory"
	"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/staff_assess/command"
	"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/staff_assess/service"
	"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/domain"
	"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/utils"
)

type NodeTaskService struct {
}

func NewNodeTaskService() *NodeTaskService {
	newRoleService := &NodeTaskService{}
	return newRoleService
}

// SendEvaluationNode 发送评估环节
func (rs *NodeTaskService) SendEvaluationNode() error {
	transactionContext, err := factory.StartTransaction()
	if err != nil {
		return err
	}
	defer func() {
		transactionContext.RollbackTransaction()

		if err := recover(); err != nil {
			log.Logger.Error(application.ThrowError(application.BUSINESS_ERROR, fmt.Sprintf("定时发送评估任务[基础查询]异常:%s", err)).Error())
		}
	}()
	taskRepository := factory.CreateNodeTaskRepository(map[string]interface{}{"transactionContext": transactionContext})
	tasks, err := taskRepository.Find(map[string]interface{}{"lessNextSentAt": time.Now().Local()})
	if err != nil {
		return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
	}
	if len(tasks) == 0 {
		return nil
	}

	projectMap := map[int64]*domain.EvaluationProject{}
	cycleMap := map[int64]*domain.EvaluationCycle{}
	for i := range tasks {
		projectMap[tasks[i].ProjectId] = nil
		cycleMap[tasks[i].CycleId] = nil
	}
	projectIds := make([]int64, 0)
	cycleIds := make([]int64, 0)
	for k := range projectMap {
		projectIds = append(projectIds, k)
	}
	for k := range cycleMap {
		cycleIds = append(cycleIds, k)
	}

	if len(projectIds) > 0 {
		projectRepository := factory.CreateEvaluationProjectRepository(map[string]interface{}{"transactionContext": transactionContext})
		_, projects, err := projectRepository.Find(map[string]interface{}{"ids": projectIds}, "template")
		if err != nil {
			return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
		}
		for i := range projects {
			projectMap[projects[i].Id] = projects[i]
		}
	}
	if len(cycleIds) > 0 {
		cycleRepository := factory.CreateEvaluationCycleRepository(map[string]interface{}{"transactionContext": transactionContext})
		_, cycles, err := cycleRepository.Find(map[string]interface{}{"ids": cycleIds})
		if err != nil {
			return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
		}
		for i := range cycles {
			cycleMap[cycles[i].Id] = cycles[i]
		}
	}
	if err = transactionContext.CommitTransaction(); err != nil {
		return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
	}

	// 相同项目节点先聚合
	taskMap := map[int64][]*domain.NodeTask{}
	for i := range tasks {
		task := tasks[i]
		array, ok := taskMap[task.ProjectId]
		if !ok {
			array = make([]*domain.NodeTask, 0)
		}
		taskMap[task.ProjectId] = append(array, task)
	}
	for k, v := range taskMap {
		project, ok := projectMap[k]
		if ok && project != nil {
			if err = rs.taskSend(project, v, cycleMap); err != nil {
				return err
			}
		} else {
			if err = rs.taskAbort(v); err != nil {
				return err
			}
		}
	}

	//staffAssessService := service.NewStaffAssessServeice()
	//now := time.Now().Local()
	//for i := range tasks {
	//	task := tasks[i]
	//	project, ok := projectMap[task.ProjectId] // 项目还存在
	//	if ok && project != nil {
	//		// 环节截止时间
	//		maxTime := task.TimeEnd.Local()
	//
	//		// 更新任务最后一次的发送时间(取当前时间)
	//		task.LastSentAt = &now
	//
	//		// 当前周起始时间和截止时间
	//		var cycleTimeStart = task.NextSentAt.Local()
	//		var cycleTimeEnd time.Time
	//
	//		// 下个周期起始时间
	//		nextTime := utils.NextTimeInc(cycleTimeStart, task.KpiCycle)
	//		// 超过截止时间
	//		if nextTime.After(maxTime) {
	//			task.NextSentAt = nil
	//		} else {
	//			task.NextSentAt = &nextTime
	//		}
	//
	//		// 周期的截至时间=下一个周期的开始时间-1秒(需求方要求提交数据时间延长到第二天8点30分截止)
	//		if task.NextSentAt == nil {
	//			//cycleTimeEnd = maxTime
	//			maxYear, maxMonth, maxDay := maxTime.Date()
	//			cycleTimeEnd = time.Date(maxYear, maxMonth, maxDay, 0, 0, 0, 0, time.Local)
	//			cycleTimeEnd = cycleTimeEnd.Add(24*time.Hour + 8*time.Hour + 30*time.Minute) // 注.延长8.5小时
	//		} else {
	//			//cycleTimeEnd = task.NextSentAt.Local().Add(-1 * time.Second) // 周期截至时间=下一个周期起始时间-1秒
	//			cycleTimeEnd = task.NextSentAt.Local().Add(8*time.Hour + 30*time.Minute) // 注.延长8.5小时
	//		}
	//
	//		// 格式化周期的起始和截止时间
	//		fmCycleStartTime := cycleTimeStart.Format("2006-01-02 15:04:05")
	//		fmCycleTimeEnd := cycleTimeEnd.Format("2006-01-02 15:04:05")
	//
	//		csat := &command.CreateStaffAssessTask{
	//			CompanyId:             int(project.CompanyId),
	//			EvaluationProjectId:   int(project.Id),
	//			EvaluationProjectName: project.Name,
	//			CycleId:               project.CycleId,
	//			StepList:              make([]command.AssessTaskStep, 0),
	//		}
	//
	//		// 周期名称
	//		if cycle, ok := cycleMap[project.CycleId]; ok {
	//			csat.CycleName = cycle.Name
	//		}
	//
	//		// 接收人
	//		csat.ExecutorId = make([]int, 0)
	//		for rIndex := range project.Recipients {
	//			vInt, _ := strconv.Atoi(project.Recipients[rIndex])
	//			csat.ExecutorId = append(csat.ExecutorId, vInt)
	//		}
	//
	//		csat.BeginTime = fmCycleStartTime
	//		csat.EndTime = fmCycleTimeEnd
	//		csat.StepList = append(csat.StepList, command.AssessTaskStep{
	//			SortBy:       task.NodeSort,
	//			LinkNodeId:   int(task.NodeId),
	//			LinkNodeName: task.NodeName,
	//			LinkNodeType: task.NodeType,
	//			BeginTime:    fmCycleStartTime,
	//			EndTime:      fmCycleTimeEnd,
	//		})
	//
	//		// 创建发送任务
	//		_, err := staffAssessService.CreateStaffAssessTask(transactionContext, csat)
	//		if err != nil {
	//			return application.ThrowError(application.INTERNAL_SERVER_ERROR, "创建发送任务"+err.Error())
	//		}
	//	} else {
	//		task.NextSentAt = nil // 项目不存在,取消周期任务发送
	//	}
	//
	//	task, err := taskRepository.Insert(task)
	//	if err != nil {
	//		return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
	//	}
	//}

	return nil

}

func (rs *NodeTaskService) taskSend(project *domain.EvaluationProject, tasks []*domain.NodeTask, cycleMap map[int64]*domain.EvaluationCycle) error {
	transactionContext, err := factory.StartTransaction()
	if err != nil {
		return err
	}
	defer func() {
		_ = transactionContext.RollbackTransaction()
		if err := recover(); err != nil {
			log.Logger.Error(application.ThrowError(application.BUSINESS_ERROR, fmt.Sprintf("定时发送评估任务异常:%s", err)).Error())
		}
	}()
	taskRepository := factory.CreateNodeTaskRepository(map[string]interface{}{"transactionContext": transactionContext})

	now := time.Now().Local()
	csat := &command.CreateStaffAssessTask{
		CompanyId:             int(project.CompanyId),
		EvaluationProjectId:   int(project.Id),
		EvaluationProjectName: project.Name,
		CycleId:               project.CycleId,
		StepList:              make([]command.AssessTaskStep, 0),
	}
	// 周期名称
	if cycle, ok := cycleMap[project.CycleId]; ok {
		csat.CycleName = cycle.Name
	}

	// 接收人
	csat.ExecutorId = make([]int, 0)
	for rIndex := range project.Recipients {
		vInt, _ := strconv.Atoi(project.Recipients[rIndex])
		csat.ExecutorId = append(csat.ExecutorId, vInt)
	}

	for i := range tasks {
		task := tasks[i]

		// 环节截止时间
		maxTime := task.TimeEnd.Local()

		// 更新任务最后一次的发送时间(取当前时间)
		task.LastSentAt = &now

		// 当前小周期范围[起始时间-截止时间]
		var cycleTimeStart = task.NextSentAt.Local()
		var cycleTimeEnd time.Time

		// 下个周期起始时间
		nextTime := utils.NextTimeInc(cycleTimeStart, task.KpiCycle)
		// 超过截止时间
		if nextTime.After(maxTime) {
			task.NextSentAt = nil
		} else {
			task.NextSentAt = &nextTime
		}
		// 更新下个周期
		_, err = taskRepository.Insert(task)
		if err != nil {
			return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
		}

		// 周期的截至时间=下一个周期的开始时间-1秒(需求方要求:提交数据时间延长到第二天8点30分截止)
		if task.NextSentAt == nil {
			//cycleTimeEnd = maxTime
			maxYear, maxMonth, maxDay := maxTime.Date()
			cycleTimeEnd = time.Date(maxYear, maxMonth, maxDay, 0, 0, 0, 0, time.Local)
			cycleTimeEnd = cycleTimeEnd.Add(24*time.Hour + 8*time.Hour + 30*time.Minute) // 注.延长8.5小时
		} else {
			//cycleTimeEnd = task.NextSentAt.Local().Add(-1 * time.Second) // 周期截至时间=下一个周期起始时间-1秒
			cycleTimeEnd = task.NextSentAt.Local().Add(8*time.Hour + 30*time.Minute) // 注.延长8.5小时
		}

		// 格式化周期的起始和截止时间
		fmCycleStartTime := cycleTimeStart.Format("2006-01-02 15:04:05")
		fmCycleTimeEnd := cycleTimeEnd.Format("2006-01-02 15:04:05")

		// 格式化周期的起始和截止时间
		if len(csat.BeginTime) == 0 {
			csat.BeginTime = fmCycleStartTime
		}
		if len(csat.EndTime) == 0 {
			csat.EndTime = fmCycleTimeEnd
		}
		csat.StepList = append(csat.StepList, command.AssessTaskStep{
			SortBy:       task.NodeSort,
			LinkNodeId:   int(task.NodeId),
			LinkNodeName: task.NodeName,
			LinkNodeType: task.NodeType,
			BeginTime:    fmCycleStartTime,
			EndTime:      fmCycleTimeEnd,
		})
	}

	// 创建发送任务
	_, err = service.NewStaffAssessServeice().CreateStaffAssessTask(transactionContext, csat)
	if err != nil {
		return application.ThrowError(application.INTERNAL_SERVER_ERROR, "创建发送任务"+err.Error())
	}

	if err = transactionContext.CommitTransaction(); err != nil {
		return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
	}

	return nil
}

// 节点任务中止
func (rs *NodeTaskService) taskAbort(tasks []*domain.NodeTask) error {
	transactionContext, err := factory.StartTransaction()
	if err != nil {
		return err
	}
	defer func() {
		transactionContext.RollbackTransaction()
	}()
	taskRepository := factory.CreateNodeTaskRepository(map[string]interface{}{"transactionContext": transactionContext})
	for i := range tasks {
		task := tasks[i]
		task.NextSentAt = nil // 项目不存在,取消周期任务发送
		_, err = taskRepository.Insert(task)
		if err != nil {
			return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
		}
	}
	if err = transactionContext.CommitTransaction(); err != nil {
		return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
	}
	return nil
}