作者 tangxvhui

更新 定时任务

  1 +package notify
  2 +
  3 +import "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/domain"
  4 +
  5 +var TaskSmsNotify *notifySms
  6 +
  7 +func RunTaskSmsNotify() {
  8 + newSmsNotify := notifySms{}
  9 + newSmsNotify.regist(notifyStaffAssess{})
  10 + newSmsNotify.regist(notifySummaryEvaluation{})
  11 +
  12 + newSmsNotify.runTask()
  13 +
  14 +}
  15 +
  16 +// 每日自评短信通知 ,预创建待发送的短信消息
  17 +func AddNotifyStaffAssess(param *domain.StaffAssess) {
  18 + newNotify := notifyStaffAssess{}
  19 + newSms := newNotify.makeNotify(param)
  20 + TaskSmsNotify.addTask(newSms)
  21 +}
  22 +
  23 +// 周期自评短信通知 ,预创建待发送的短信消息
  24 +func AddNotifySummaryEvaluation(param *domain.SummaryEvaluation) {
  25 + newNotify := notifySummaryEvaluation{}
  26 + newSms := newNotify.makeNotify(param)
  27 + TaskSmsNotify.addTask(newSms)
  28 +}
1 package notify 1 package notify
2 2
  3 +import (
  4 + "fmt"
  5 + "time"
  6 +
  7 + "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/factory"
  8 + "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/constant"
  9 + "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/domain"
  10 + "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/infrastructure/serviceGateway"
  11 + "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/log"
  12 +)
  13 +
  14 +type notifySendOrNot interface {
  15 + from() string
  16 + ifSend(index int) (bool, error)
  17 +}
  18 +
3 // 短信通知 19 // 短信通知
  20 +type notifySms struct {
  21 + newSms chan *domain.LogSms
  22 + interval time.Duration
  23 + sendOrNot map[string]notifySendOrNot //到点后判断是否真的发送短信消息
  24 +}
  25 +
  26 +func (notices *notifySms) init() {
  27 + notices.newSms = make(chan *domain.LogSms, 50)
  28 + notices.interval = 10 * time.Minute
  29 + if constant.Env != "prd" {
  30 + notices.interval = 1 * time.Minute
  31 + }
  32 + notices.sendOrNot = map[string]notifySendOrNot{}
  33 +}
  34 +
  35 +func (notices *notifySms) regist(ifsend notifySendOrNot) {
  36 + notices.sendOrNot[ifsend.from()] = ifsend
  37 +}
  38 +
  39 +func (notices *notifySms) addTask(task *domain.LogSms) {
  40 + notices.newSms <- task
  41 +}
  42 +
  43 +// RunTask 执行短信通知任务
  44 +func (notices *notifySms) runTask() {
  45 + notices.init()
  46 + timer := time.NewTimer(notices.interval)
  47 + for {
  48 + select {
  49 + case newSms := <-notices.newSms:
  50 + err := notices.addNewSms(newSms)
  51 + if err != nil {
  52 + e := fmt.Sprintf("添加短信通知任务:%+v %s", newSms, err)
  53 + log.Logger.Error(e)
  54 + }
  55 + case <-timer.C:
  56 + err := notices.checkSendSms()
  57 + if err != nil {
  58 + log.Logger.Error("检查发送短信通知任务:" + err.Error())
  59 + }
  60 + timer.Reset(notices.interval) // 重置定时
  61 + }
  62 + }
  63 +}
  64 +
  65 +func (notices *notifySms) addNewSms(newSms *domain.LogSms) error {
  66 + transactionContext, err := factory.CreateTransactionContext(nil)
  67 + if err != nil {
  68 + return err
  69 + }
  70 + if err := transactionContext.StartTransaction(); err != nil {
  71 + return err
  72 + }
  73 + defer func() {
  74 + _ = transactionContext.RollbackTransaction()
  75 + }()
  76 + logSmsRepo := factory.CreateLogSmsRepository(map[string]interface{}{"transactionContext": transactionContext})
  77 + err = logSmsRepo.Save(newSms)
  78 + if err != nil {
  79 + return err
  80 + }
  81 + if err := transactionContext.CommitTransaction(); err != nil {
  82 + return err
  83 + }
  84 + return nil
  85 +}
  86 +
  87 +func (notices *notifySms) checkSendSms() error {
  88 + transactionContext, err := factory.CreateTransactionContext(nil)
  89 + if err != nil {
  90 + return err
  91 + }
  92 + if err := transactionContext.StartTransaction(); err != nil {
  93 + return err
  94 + }
  95 + defer func() {
  96 + _ = transactionContext.RollbackTransaction()
  97 + }()
  98 + logSmsRepo := factory.CreateLogSmsRepository(map[string]interface{}{"transactionContext": transactionContext})
  99 + nowTime := time.Now()
  100 + nowDay := dayZeroTime(nowTime)
  101 + _, logSmsList, err := logSmsRepo.Find(map[string]interface{}{
  102 + "status": string(domain.SmsWait),
  103 + "executeAtBegin": nowDay,
  104 + "executeAtEnd": nowTime,
  105 + })
  106 + if err != nil {
  107 + return err
  108 + }
  109 +
  110 + if err := transactionContext.CommitTransaction(); err != nil {
  111 + return err
  112 + }
  113 + for _, v := range logSmsList {
  114 + err = notices.sendSms(v)
  115 + if err != nil {
  116 + e := fmt.Sprintf("发送短信通知:%+v %s", *v, err)
  117 + log.Logger.Error(e)
  118 + }
  119 + }
  120 + return nil
  121 +}
  122 +
  123 +func (notices *notifySms) sendSms(param *domain.LogSms) error {
  124 + //单开处理 数据保存操作,发一条短信更新一条数据
  125 + transactionContext, err := factory.CreateTransactionContext(nil)
  126 + if err != nil {
  127 + return err
  128 + }
  129 + if err := transactionContext.StartTransaction(); err != nil {
  130 + return err
  131 + }
  132 + defer func() {
  133 + _ = transactionContext.RollbackTransaction()
  134 + }()
  135 + logSmsRepo := factory.CreateLogSmsRepository(map[string]interface{}{"transactionContext": transactionContext})
  136 +
  137 + sendOk := false
  138 + sendOrNot, ok := notices.sendOrNot[param.From]
  139 + if ok {
  140 + ok, err := sendOrNot.ifSend(param.Index)
  141 + if err != nil {
  142 + param.Result = err.Error()
  143 +
  144 + }
  145 + sendOk = ok
  146 + } else {
  147 + sendOk = true
  148 + }
  149 + if !sendOk {
  150 + param.Status = domain.SmsIgnore
  151 + } else {
  152 + sms := serviceGateway.SmsService{}
  153 + err = sms.SendNoticeSms(param.Phone, param.TemplateId, param.Value)
  154 + if err != nil {
  155 + param.Result = err.Error()
  156 + } else {
  157 + param.Status = domain.SmsSuccess
  158 + }
  159 + }
  160 + err = logSmsRepo.Save(param)
  161 + if err != nil {
  162 + return err
  163 + }
  164 + if err := transactionContext.CommitTransaction(); err != nil {
  165 + return err
  166 + }
  167 + return nil
  168 +}
  169 +
  170 +func dayZeroTime(t time.Time) time.Time {
  171 + y, m, d := t.UTC().Date()
  172 + t2 := time.Date(y, m, d, 0, 0, 0, 0, time.UTC)
  173 + return t2
  174 +}
1 package notify 1 package notify
2 2
3 import ( 3 import (
4 - "fmt"  
5 "time" 4 "time"
6 5
7 "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/factory" 6 "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/factory"
8 - "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/constant"  
9 "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/domain" 7 "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/domain"
10 - "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/log"  
11 ) 8 )
12 9
13 -// 短信通知  
14 -  
15 // 每日自评短信通知 10 // 每日自评短信通知
16 -// 条件:每日自评结束前30分钟,且还未完成评估填写  
17 -type NotifyStaffAssess struct {  
18 - newSms chan *domain.LogSms  
19 - interval time.Duration 11 +type notifyStaffAssess struct {
20 } 12 }
21 13
22 -func (notices *NotifyStaffAssess) init() {  
23 - notices.newSms = make(chan *domain.LogSms, 50)  
24 - notices.interval = 10 * time.Minute  
25 - if constant.Env != "prd" {  
26 - notices.interval = 1 * time.Minute  
27 - }  
28 -}  
29 -  
30 -func (notices *NotifyStaffAssess) From() string { 14 +func (notices notifyStaffAssess) from() string {
31 return "StaffAssess" 15 return "StaffAssess"
32 } 16 }
33 17
34 -// AddTask 添加待执行的短信通知任务  
35 -func (notices *NotifyStaffAssess) AddTask(index int, phone string, param map[string]string, executeAt time.Time) {  
36 - newSms := &domain.LogSms{ 18 +// makeNotify 生成待执行的短信通知内容
  19 +func (notices notifyStaffAssess) makeNotify(param *domain.StaffAssess) *domain.LogSms {
  20 + newSms := domain.LogSms{
37 Id: 0, 21 Id: 0,
38 - Phone: phone, 22 + Phone: param.Executor.Account,
39 TemplateId: 5475050, 23 TemplateId: 5475050,
40 Template: "您好,#name#,百忙之中不要忘记填写今天的绩效自评反馈哦", 24 Template: "您好,#name#,百忙之中不要忘记填写今天的绩效自评反馈哦",
41 - Value: param,  
42 - Result: "",  
43 - Status: domain.SmsWait,  
44 - From: notices.From(),  
45 - Index: index,  
46 - ExecuteAt: executeAt,  
47 - CreatedAt: time.Now(), 25 + Value: map[string]string{
  26 + "name": param.Executor.UserName,
  27 + },
  28 + Result: "",
  29 + Status: domain.SmsWait,
  30 + From: notices.from(),
  31 + Index: param.Id,
  32 + // ExecuteAt: executeAt,
  33 + CreatedAt: time.Now(),
48 } 34 }
49 - notices.newSms <- newSms 35 + // 每日自评 结束前30 分钟
  36 + newSms.ExecuteAt = param.EndTime.Add(-1800 * time.Second)
  37 + return &newSms
50 } 38 }
51 39
52 -// RunTask 执行短信通知任务  
53 -func (notices *NotifyStaffAssess) RunTask() {  
54 - notices.init()  
55 - timer := time.NewTimer(notices.interval)  
56 - for {  
57 - select {  
58 - case newSms := <-notices.newSms:  
59 - err := notices.addNewSms(newSms)  
60 - if err != nil {  
61 - e := fmt.Sprintf("添加短信通知任务:%+v %s", newSms, err)  
62 - log.Logger.Error(e)  
63 - }  
64 - case <-timer.C:  
65 - err := notices.sendSms()  
66 - if err != nil {  
67 - log.Logger.Error("发送短信通知任务:" + err.Error())  
68 - }  
69 - timer.Reset(notices.interval) // 重置定时  
70 - }  
71 - }  
72 -}  
73 -  
74 -func (notices *NotifyStaffAssess) addNewSms(newSms *domain.LogSms) error { 40 +// ifSendSms 确认是否发送通知
  41 +func (notices notifyStaffAssess) ifSend(index int) (bool, error) {
75 transactionContext, err := factory.CreateTransactionContext(nil) 42 transactionContext, err := factory.CreateTransactionContext(nil)
76 if err != nil { 43 if err != nil {
77 - return err 44 + return false, err
78 } 45 }
79 if err := transactionContext.StartTransaction(); err != nil { 46 if err := transactionContext.StartTransaction(); err != nil {
80 - return err 47 + return false, err
81 } 48 }
82 defer func() { 49 defer func() {
83 _ = transactionContext.RollbackTransaction() 50 _ = transactionContext.RollbackTransaction()
84 }() 51 }()
85 - logSmsRepo := factory.CreateLogSmsRepository(map[string]interface{}{"transactionContext": transactionContext})  
86 - err = logSmsRepo.Save(newSms)  
87 - if err != nil {  
88 - return err  
89 - }  
90 - if err := transactionContext.CommitTransaction(); err != nil {  
91 - return err  
92 - }  
93 - return nil  
94 -}  
95 -  
96 -func (notices *NotifyStaffAssess) sendSms() error {  
97 - transactionContext, err := factory.CreateTransactionContext(nil) 52 + staffAssessRepo := factory.CreateStaffAssessRepository(map[string]interface{}{"transactionContext": transactionContext})
  53 + assessData, err := staffAssessRepo.FindOne(map[string]interface{}{"id": index})
98 if err != nil { 54 if err != nil {
99 - return err 55 + return false, err
100 } 56 }
101 - if err := transactionContext.StartTransaction(); err != nil {  
102 - return err 57 + //还未完成评估填写,时发送短信
  58 + if assessData.Status == domain.StaffAssessUncompleted {
  59 + return true, nil
103 } 60 }
104 - defer func() {  
105 - _ = transactionContext.RollbackTransaction()  
106 - }()  
107 - logSmsRepo := factory.CreateLogSmsRepository(map[string]interface{}{"transactionContext": transactionContext})  
108 - _ = logSmsRepo  
109 if err := transactionContext.CommitTransaction(); err != nil { 61 if err := transactionContext.CommitTransaction(); err != nil {
110 - return err 62 + return false, err
111 } 63 }
112 - return nil 64 + return false, nil
113 } 65 }
1 package notify 1 package notify
2 2
3 import ( 3 import (
  4 + "time"
  5 +
4 "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/factory" 6 "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/factory"
5 "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/domain" 7 "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/domain"
6 ) 8 )
7 9
8 // 周期评估短信通知 10 // 周期评估短信通知
9 // 条件:周期自评结束前4个小时,且还未完成评估填写 11 // 条件:周期自评结束前4个小时,且还未完成评估填写
10 -type NotifySummaryEvaluation struct {  
11 -}  
12 -  
13 -func (notices *NotifySummaryEvaluation) Init() *NotifySummaryEvaluation {  
14 - return &NotifySummaryEvaluation{} 12 +type notifySummaryEvaluation struct {
15 } 13 }
16 14
17 -func (notices *NotifySummaryEvaluation) From() string { 15 +func (notices notifySummaryEvaluation) from() string {
18 return "SummaryEvaluation" 16 return "SummaryEvaluation"
19 } 17 }
20 18
21 -// AddTask 添加待执行的短信通知任务  
22 -func (notices *NotifySummaryEvaluation) AddTask(index string, phone string, param map[string]string) error {  
23 -  
24 - return nil  
25 -}  
26 -  
27 -// RunTask 执行短信通知任务  
28 -func (notice *NotifySummaryEvaluation) RunTask() error {  
29 -  
30 - return nil 19 +// makeNotify 生成待执行的短信通知内容
  20 +func (notices notifySummaryEvaluation) makeNotify(param *domain.SummaryEvaluation) *domain.LogSms {
  21 + newSms := domain.LogSms{
  22 + Id: 0,
  23 + Phone: param.Executor.Account,
  24 + TemplateId: 5536232,
  25 + Template: "您好,#name#,百忙之中不要忘记填写本月综合自评哦",
  26 + Value: map[string]string{
  27 + "name": param.Executor.UserName,
  28 + },
  29 + Result: "",
  30 + Status: domain.SmsWait,
  31 + From: notices.from(),
  32 + Index: param.Id,
  33 + // ExecuteAt: executeAt,
  34 + CreatedAt: time.Now(),
  35 + }
  36 + // 周期自评结束前4个小时,
  37 + newSms.ExecuteAt = param.EndTime.Add(-4 * time.Hour)
  38 + return &newSms
31 } 39 }
32 40
33 -func (notice *NotifySummaryEvaluation) addNewSms(newSms *domain.LogSms) error { 41 +// ifSendSms 确认是否发送通知
  42 +func (notices notifySummaryEvaluation) ifSend(index int) (bool, error) {
34 transactionContext, err := factory.CreateTransactionContext(nil) 43 transactionContext, err := factory.CreateTransactionContext(nil)
35 if err != nil { 44 if err != nil {
36 - return err 45 + return false, err
37 } 46 }
38 if err := transactionContext.StartTransaction(); err != nil { 47 if err := transactionContext.StartTransaction(); err != nil {
39 - return err 48 + return false, err
40 } 49 }
41 defer func() { 50 defer func() {
42 _ = transactionContext.RollbackTransaction() 51 _ = transactionContext.RollbackTransaction()
43 }() 52 }()
44 - logSmsRepo := factory.CreateLogSmsRepository(map[string]interface{}{"transactionContext": transactionContext})  
45 - err = logSmsRepo.Save(newSms) 53 + summaryEvaluationRepo := factory.CreateSummaryEvaluationRepository(map[string]interface{}{"transactionContext": transactionContext})
  54 + summaryEvaluationData, err := summaryEvaluationRepo.FindOne(map[string]interface{}{"id": index})
46 if err != nil { 55 if err != nil {
47 - return err 56 + return false, err
  57 + }
  58 + //还未完成评估填写,时发送短信
  59 + if summaryEvaluationData.Status == domain.EvaluationUncompleted {
  60 + return true, nil
48 } 61 }
49 if err := transactionContext.CommitTransaction(); err != nil { 62 if err := transactionContext.CommitTransaction(); err != nil {
50 - return err 63 + return false, err
51 } 64 }
52 - return nil 65 + return false, nil
53 } 66 }
@@ -328,6 +328,3 @@ func sendSmsEvalation(param []domain.SummaryEvaluation) error { @@ -328,6 +328,3 @@ func sendSmsEvalation(param []domain.SummaryEvaluation) error {
328 } 328 }
329 return nil 329 return nil
330 } 330 }
331 -  
332 -//周期自评 短信提醒  
333 -//周期自评 结束前 4个小时发送  
@@ -5,26 +5,24 @@ import "time" @@ -5,26 +5,24 @@ import "time"
5 //记录 发送的短信消息 5 //记录 发送的短信消息
6 6
7 type LogSms struct { 7 type LogSms struct {
8 - Id int  
9 - Phone string  
10 - TemplateId int  
11 - Template string  
12 - Value map[string]string  
13 - Result string  
14 - Status SmsStatus  
15 - From string //业务来源  
16 - Index int //业务数据索引  
17 - ExecuteAt time.Time  
18 - CreatedAt time.Time 8 + Id int `json:"id"`
  9 + Phone string `json:"phone"`
  10 + TemplateId int `json:"templateId"`
  11 + Template string `json:"template"`
  12 + Value map[string]string `json:"value"`
  13 + Result string `json:"result"`
  14 + Status SmsStatus `json:"status"`
  15 + From string `json:"from"` //业务来源
  16 + Index int `json:"index"` //业务数据索引
  17 + ExecuteAt time.Time `json:"executeAt"`
  18 + CreatedAt time.Time `json:"createdAt"`
19 } 19 }
20 20
21 type SmsStatus string 21 type SmsStatus string
22 22
23 const ( 23 const (
24 SmsWait SmsStatus = "wait" //等待执行 24 SmsWait SmsStatus = "wait" //等待执行
25 - SmsInit SmsStatus = "init" //正在执行  
26 SmsSuccess SmsStatus = "success" //执行成功 25 SmsSuccess SmsStatus = "success" //执行成功
27 - SmsFailed SmsStatus = "failed" //执行失败  
28 SmsIgnore SmsStatus = "ignore" //忽略执行 26 SmsIgnore SmsStatus = "ignore" //忽略执行
29 ) 27 )
30 28
@@ -63,20 +63,16 @@ func (repo *LogSmsRepository) Find(queryOptions map[string]interface{}) (int, [] @@ -63,20 +63,16 @@ func (repo *LogSmsRepository) Find(queryOptions map[string]interface{}) (int, []
63 query.Offset(v) 63 query.Offset(v)
64 } 64 }
65 65
66 - if v, ok := queryOptions["form"]; ok {  
67 - query.Where("from=?", v)  
68 - }  
69 -  
70 - if v, ok := queryOptions["index"]; ok {  
71 - query.Where("index=?", v)  
72 - }  
73 -  
74 if v, ok := queryOptions["status"]; ok { 66 if v, ok := queryOptions["status"]; ok {
75 query.Where("status=?", v) 67 query.Where("status=?", v)
76 } 68 }
77 69
78 - if v, ok := queryOptions["executeAt"]; ok {  
79 - query.Where("execute_at=?", v) 70 + if v, ok := queryOptions["executeAtBegin"]; ok {
  71 + query.Where("execute_at>=?", v)
  72 + }
  73 +
  74 + if v, ok := queryOptions["executeAtEnd"]; ok {
  75 + query.Where("execute_at<=?", v)
80 } 76 }
81 77
82 count, err := query.SelectAndCount() 78 count, err := query.SelectAndCount()