作者 tangxvhui

增加 app 消息通知 的定时任务

... ... @@ -19,6 +19,7 @@ func main() {
startSummaryEvaluation()
startConfirmEvaluationScore()
go notify.RunTaskSmsNotify()
go notify.AppMessageRun()
go consumer.Run()
web.Run()
}
... ...
package notify
// 手机端 ,消息通知
// 定时任务检查业务,确定是否发送短信消息
import (
"encoding/json"
"fmt"
"strconv"
"time"
// 关于反馈异常的消息通知;早上9点 开始检查,并确认发送的消息
func taskRecordAnomaly() (map[string]string, error) {
"github.com/linmadan/egglib-go/core/application"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/application/factory"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/infrastructure/dao"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/log"
)
func messageTaskStageAnomaly(companyId int) ([]*domain.MessagePersonal, error) {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
if err := transactionContext.StartTransaction(); err != nil {
return nil, application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
defer func() {
_ = transactionContext.RollbackTransaction()
}()
taskDao := dao.NewTaskDao(map[string]interface{}{"transactionContext": transactionContext})
userDao := dao.NewUserDao(map[string]interface{}{"transactionContext": transactionContext})
taskData, err := taskDao.TaskStageAnomalyAll(companyId)
if err != nil {
return nil, fmt.Errorf("获取任务信息%s", err)
}
if len(taskData) == 0 {
return nil, nil
}
var allMessage []*domain.MessagePersonal
for _, val := range taskData {
//获取我全上级
userList, err := userDao.AllParentUser(val.LeaderId)
if err != nil {
return nil, fmt.Errorf("获取上级人员信息%s", err)
}
if len(userList) == 0 {
continue
}
for _, val2 := range userList {
content := ""
if val2.Level == 1 {
content = fmt.Sprintf("【您下级%s关注的项目【%s】里程碑未按时完成,请前往辅导。】", val.LeaderName, val.TaskName)
} else if val2.Level == 2 {
content = fmt.Sprintf("【您下级%s负责的项目【%s】里程碑未按时完成,请前往辅导。】", val.LeaderName, val.TaskName)
} else if val2.Level == 3 {
content = fmt.Sprintf("【您下级%s关注的项目【%s】里程碑未按时完成,请前往辅导。】", val.LeaderName, val.TaskName)
}
if content == "" {
continue
}
payload := map[string]string{
"task_id": strconv.Itoa(val.TaskId),
"task_alias": val.TaskAlias,
"task_name": val.TaskName,
}
payloadStr, _ := json.Marshal(payload)
newMessage := domain.MessagePersonal{
Id: 0,
Types: domain.MessageTypesTaskStageApp,
TargetUserId: val2.Id,
ReadFlag: domain.MessageIsRead,
Title: content,
Content: content,
CreatedAt: time.Time{},
UpdatedAt: time.Time{},
Payload: string(payloadStr),
}
allMessage = append(allMessage, &newMessage)
}
}
if err := transactionContext.CommitTransaction(); err != nil {
return nil, err
}
return allMessage, nil
}
// 关于里程碑异常的消息通知;早上9点 开始检查,并确认发送的消息
func taskStageAnomaly() (map[string]string, error) {
func saveAllMessagePersonal(msgList []*domain.MessagePersonal) error {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return err
}
if err := transactionContext.StartTransaction(); err != nil {
return err
}
defer func() {
_ = transactionContext.RollbackTransaction()
}()
messageRepo := factory.CreateMessagePersonalRepository(map[string]interface{}{
"transactionContext": transactionContext,
})
for _, val := range msgList {
err = messageRepo.Save(val)
if err != nil {
return fmt.Errorf("保存MessagePersonal%s", err)
}
}
if err := transactionContext.CommitTransaction(); err != nil {
return err
}
return nil
}
return nil, nil
func appMessageSend() error {
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return err
}
if err := transactionContext.StartTransaction(); err != nil {
return err
}
defer func() {
_ = transactionContext.RollbackTransaction()
}()
taskDao := dao.NewTaskDao(map[string]interface{}{"transactionContext": transactionContext})
//获取公司id
companyIds, err := taskDao.TaskStageAnomalyForCompany()
if err != nil {
return fmt.Errorf("获取公司id%s", err)
}
if err := transactionContext.CommitTransaction(); err != nil {
return err
}
var appMessage []*domain.MessagePersonal
for _, val := range companyIds {
messageList, err := messageTaskStageAnomaly(val)
if err != nil {
return fmt.Errorf("生成里程碑异常的消息通知失败%s", err)
}
appMessage = append(appMessage, messageList...)
}
err = saveAllMessagePersonal(appMessage)
if err != nil {
return fmt.Errorf("保存里程碑异常的消息通知失败%s", err)
}
return nil
}
// 关于里程碑异常的消息通知;早上9点 开始检查,并确认发送的消息
func AppMessageRun() {
nowTime := time.Now()
y, m, d := nowTime.Date()
t1 := time.Date(y, m, d, 9, 0, 0, 0, time.Local) //今天的9点
interval := t1.Sub(nowTime)
if interval < 0 {
interval = (24 * time.Hour) + interval
}
timer := time.NewTimer(interval)
for {
<-timer.C
err := appMessageSend()
if err != nil {
log.Logger.Error("发送关于里程碑异常的消息通知:" + err.Error())
}
timer.Reset(24 * time.Hour)
}
}
... ...
... ... @@ -162,9 +162,9 @@ func (srv *MessagePersonalService) TodayMessageTaskStageAnomaly(param *command.G
}
}
// 作为上级,我的下级员工的异常里程碑
var taskStageList2 []dao.TaskStageData
var taskList2 []dao.TaskData3
if len(childUserId) == 0 {
taskStageList2, err = taskDao.TaskStageAnomalyByLeader(childUserId)
taskList2, err = taskDao.TaskStageAnomalyByLeader(childUserId)
if err != nil {
return nil, application.ThrowError(application.INTERNAL_SERVER_ERROR, "检查任务里程碑异常的消息"+err.Error())
}
... ... @@ -176,9 +176,9 @@ func (srv *MessagePersonalService) TodayMessageTaskStageAnomaly(param *command.G
}
}
// 作为上级, 我的下下级员工的异常里程碑
var taskStageList3 []dao.TaskStageData
var taskList3 []dao.TaskData3
if len(childUserId) == 0 {
taskStageList3, err = taskDao.TaskStageAnomalyByLeader(childUserId2)
taskList3, err = taskDao.TaskStageAnomalyByLeader(childUserId2)
if err != nil {
return nil, application.ThrowError(application.INTERNAL_SERVER_ERROR, "检查任务里程碑异常的消息"+err.Error())
}
... ... @@ -186,13 +186,14 @@ func (srv *MessagePersonalService) TodayMessageTaskStageAnomaly(param *command.G
msgList := []adapter.MessageListAdapter{}
for _, val := range taskStageList {
s := fmt.Sprintf("【您负责的项目【%s】里程碑未按时完成,请重点关注,积极寻找上级辅导。】", val.Name)
s := fmt.Sprintf("【您负责的项目【%s】里程碑未按时完成,请重点关注,积极寻找上级辅导。】", val.TaskAlias)
msgList = append(msgList, adapter.MessageListAdapter{
Content: s,
})
payload := map[string]string{
"id": strconv.Itoa(val.Id),
"taskName": val.Name,
"task_id": strconv.Itoa(val.TaskId),
"task_alias": val.TaskAlias,
"task_name": val.TaskName,
}
payloadStr, _ := json.Marshal(payload)
newMessage := domain.MessagePersonal{
... ... @@ -211,14 +212,15 @@ func (srv *MessagePersonalService) TodayMessageTaskStageAnomaly(param *command.G
return nil, application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
}
for _, val := range taskStageList2 {
s := fmt.Sprintf("【您下级%s负责的项目【%s】里程碑未按时完成,请前往辅导。】", val.LeaderName, val.Name)
for _, val := range taskList2 {
s := fmt.Sprintf("【您下级%s负责的项目【%s】里程碑未按时完成,请前往辅导。】", val.LeaderName, val.TaskAlias)
msgList = append(msgList, adapter.MessageListAdapter{
Content: s,
})
payload := map[string]string{
"id": strconv.Itoa(val.Id),
"taskName": val.Name,
"task_id": strconv.Itoa(val.TaskId),
"task_alias": val.TaskAlias,
"task_name": val.TaskName,
}
payloadStr, _ := json.Marshal(payload)
newMessage := domain.MessagePersonal{
... ... @@ -237,14 +239,15 @@ func (srv *MessagePersonalService) TodayMessageTaskStageAnomaly(param *command.G
return nil, application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
}
for _, val := range taskStageList3 {
s := fmt.Sprintf("【您下级%s关注的项目【%s】里程碑未按时完成,请前往辅导。】", val.LeaderName, val.Name)
for _, val := range taskList3 {
s := fmt.Sprintf("【您下级%s关注的项目【%s】里程碑未按时完成,请前往辅导。】", val.LeaderName, val.TaskAlias)
msgList = append(msgList, adapter.MessageListAdapter{
Content: s,
})
payload := map[string]string{
"id": strconv.Itoa(val.Id),
"taskName": val.Name,
"task_id": strconv.Itoa(val.TaskId),
"task_alias": val.TaskAlias,
"task_name": val.TaskName,
}
payloadStr, _ := json.Marshal(payload)
newMessage := domain.MessagePersonal{
... ... @@ -265,13 +268,14 @@ func (srv *MessagePersonalService) TodayMessageTaskStageAnomaly(param *command.G
}
//我作为任务相关人
for _, val := range taskStageList0 {
s := fmt.Sprintf("【您下级%s关注的项目【%s】里程碑未按时完成,请前往辅导。】", val.LeaderName, val.Name)
s := fmt.Sprintf("【您下级%s关注的项目【%s】里程碑未按时完成,请前往辅导。】", val.LeaderName, val.TaskName)
msgList = append(msgList, adapter.MessageListAdapter{
Content: s,
})
payload := map[string]string{
"id": strconv.Itoa(val.Id),
"taskName": val.Name,
"task_id": strconv.Itoa(val.TaskId),
"task_alias": val.TaskAlias,
"task_name": val.TaskName,
}
payloadStr, _ := json.Marshal(payload)
newMessage := domain.MessagePersonal{
... ...
... ... @@ -25,6 +25,9 @@ var PLATFORM_ADMIN_ID = 28
// 员工绩效平台ID
var PLATFORM_FONT_ID = 29
// app 应用消息通知
var APP_MESSAGE_KEY = "mmm.ability.performance"
func init() {
if os.Getenv("LOG_LEVEL") != "" {
LOG_LEVEL = os.Getenv("LOG_LEVEL")
... ...
... ... @@ -23,6 +23,7 @@ const (
MessageTypesTaskStage MessageTypes = "task_stage" //任务里程碑未按时完成
MessageTypesTaskRecord MessageTypes = "task_record" //任务每日反馈异常
MessageTypesTaskModify MessageTypes = "task_modify" //修改任务
MessageTypesTaskStageApp MessageTypes = "task_stage-app" //任务里程碑未按时完成
)
type MessageReadFlag string
... ...
... ... @@ -385,65 +385,58 @@ func (d *TaskDao) IncreaseAnomaly(id []int, incr int) error {
}
// TaskStageData
type TaskStageData struct {
Id int `pg:"id"`
Name string `pg:"name"`
// type TaskStageData struct {
// TaskStageId int `pg:"task_stage_id"`
// TaskStageName string `pg:"task_stage_name"`
// LeaderName string `pg:"leader_name"`
// TaskAlias string `pg:"task_alias"`
// }
type TaskData3 struct {
TaskId int `pg:"task_id"`
TaskName string `pg:"task_name"`
TaskAlias string `pg:"task_alias"`
LeaderName string `pg:"leader_name"`
}
// 根据负责人获取超期未完成的异常的里程碑任务
func (d *TaskDao) TaskStageAnomalyByLeader(leaderId []string) ([]TaskStageData, error) {
sqlStr := `select
task_stage.id,
task_stage."name" ,
task.leader ->>'name' as leader_name
from task_stage
join task on task.id=task_stage.task_id
func (d *TaskDao) TaskStageAnomalyByLeader(leaderId []string) ([]TaskData3, error) {
sqlStr := `select distinct
task.id as task_id,
task.leader ->>'name' as leader_name,
task.alias as task_alias,
task.name as task_name
from task
join task_stage on task.id=task_stage.task_id
where 1=1
and task_stage.real_completed_at=0 and task_stage.plan_completed_at<extract(epoch from now())
and task.leader ->>'id' in(?) `
result := []TaskStageData{}
result := []TaskData3{}
tx := d.transactionContext.PgTx
_, err := tx.Query(&result, sqlStr, pg.In(leaderId))
return result, err
}
// 根据任务相关人获取超期未完成的异常的里程碑任务
func (d *TaskDao) TaskStageAnomalyByRelatedUser(relatedUserId int) ([]TaskStageData, error) {
sqlStr := `select
task_stage.id,
task_stage."name" ,
task.leader ->>'name' as leader_name
from task_stage
join task on task.id=task_stage.task_id
func (d *TaskDao) TaskStageAnomalyByRelatedUser(relatedUserId int) ([]TaskData3, error) {
sqlStr := `select distinct
task.id as task_id,
task.leader ->>'name' as leader_name,
task.alias as task_alias,
task.name as task_name
from task
join task_stage on task.id=task_stage.task_id
where 1=1
and task_stage.real_completed_at=0 and task_stage.plan_completed_at<extract(epoch from now())
and task.related_user @>? `
relatedUser := fmt.Sprintf("[%d]", relatedUserId)
result := []TaskStageData{}
result := []TaskData3{}
tx := d.transactionContext.PgTx
_, err := tx.Query(&result, sqlStr, relatedUser)
return result, err
}
// TaskStageAnomalyAll 获取所有异常的里程碑任务
func (d TaskDao) TaskStageAnomalyAll() ([]TaskStageData, error) {
sqlStr := `select
task_stage.id,
task_stage."name" ,
task.leader ->>'name' as leader_name
from task_stage
join task on task.id=task_stage.task_id
where 1=1
and task_stage.real_completed_at=0 and task_stage.plan_completed_at<extract(epoch from now()) `
result := []TaskStageData{}
tx := d.transactionContext.PgTx
_, err := tx.Query(&result, sqlStr)
return result, err
}
type TaskData2 struct {
Id int `pg:"id"`
Name string `pg:"name"`
... ... @@ -562,3 +555,54 @@ func (d *TaskDao) ListTask2ForHrbp(userId int, companyId int) ([]TaskData1, erro
return result, err
}
// 获取异常任务对应的公司
func (d *TaskDao) TaskStageAnomalyForCompany() ([]int, error) {
var companyIds []struct {
CompanyId int `pg:"company_id"`
}
sqlStr := `select distinct company_id
from task
where 1=1
and (task.warn_flag >0)`
result := []TaskData1{}
tx := d.transactionContext.PgTx
_, err := tx.Query(&result, sqlStr)
if err != nil {
return nil, err
}
var ids []int
for _, v := range companyIds {
ids = append(ids, v.CompanyId)
}
return ids, nil
}
type TaskData4 struct {
TaskId int `pg:"task_id"`
TaskName string `pg:"task_name"`
TaskAlias string `pg:"task_alias"`
LeaderName string `pg:"leader_name"`
LeaderId int `pg:"leader_id"`
}
// TaskStageAnomalyAll 获取所有异常的里程碑任务
func (d TaskDao) TaskStageAnomalyAll(companyId int) ([]TaskData4, error) {
sqlStr := `select distinct
task.id as task_id,
task."name" as task_name ,
task.alias as task_alias,
task.leader ->>'id' as leader_id,
task.leader ->>'name' as leader_name
from task
join task_stage on task.id=task_stage.task_id
where 1=1
and task_stage.real_completed_at=0
and task_stage.plan_completed_at<extract(epoch from now())
and task.company_id =?
`
result := []TaskData4{}
tx := d.transactionContext.PgTx
_, err := tx.Query(&result, sqlStr, companyId)
return result, err
}
... ...
... ... @@ -31,7 +31,7 @@ func (d *UserDao) AllChildUser(userId int) ([]UserData1, error) {
(
select "user".id,"user".parent_id ,"user".account,"user".name, 1 as "level"
from "user"
where "user".id=?
where "user".id=? and "user".deleted_at isnull
)
union
(
... ... @@ -39,6 +39,7 @@ func (d *UserDao) AllChildUser(userId int) ([]UserData1, error) {
"parent_user"."level"+1 as "level"
from "user" as "child_user"
join t_user as "parent_user" on "parent_user".id="child_user".parent_id
where "child_user".deleted_at isnull
)
)select * from t_user `
result := []UserData1{}
... ... @@ -47,10 +48,35 @@ func (d *UserDao) AllChildUser(userId int) ([]UserData1, error) {
return result, err
}
func (d *UserDao) ChildUser(userId int) ([]UserData1, error) {
sqlStr := `select "user".id,"user".parent_id ,"user".account,"user".name
// func (d *UserDao) ChildUser(userId int) ([]UserData1, error) {
// sqlStr := `select "user".id,"user".parent_id ,"user".account,"user".name
// from "user"
// where "user".parent_id=?`
// result := []UserData1{}
// tx := d.transactionContext.PgTx
// _, err := tx.Query(&result, sqlStr, userId)
// return result, err
// }
// AllParentUser 获取我的全上级
func (d *UserDao) AllParentUser(userId int) ([]UserData1, error) {
sqlStr := `with
-- 人员自身以及全下级
recursive t_user as (
(
select "user".id,"user".parent_id ,"user".account,"user".name,1 as "level"
from "user"
where "user".parent_id=?`
where "user".id=? and "user".deleted_at isnull
)
union
(
select "child_user".id,"child_user".parent_id,"child_user".account,"child_user".name,
"child_user"."level" + 1 as "level"
from "user" as "parent_user"
join t_user as "child_user" on "parent_user".id="child_user".parent_id
where "parent_user".id <>0 and "parent_user".deleted_at isnull
)
)select * from t_user `
result := []UserData1{}
tx := d.transactionContext.PgTx
_, err := tx.Query(&result, sqlStr, userId)
... ...
... ... @@ -18,6 +18,15 @@ type MessageOptions struct {
Receivers []int64 `json:"receivers"` // 消息接收者uid
Title string `json:"title"` // 个推标题
Content string `json:"content"` // 个推内容
Ext MessageExt `json:"ext"` // 为保证请求正常,保留字段. 暂时用不到的字段。
}
type MessageExt struct {
TransData struct {
MmmType string `json:"mmmType"`
MmmTitle string `json:"mmmTitle"`
MmmContent string `json:"mmmContent"`
} `json:"transData"`
}
// PushInfo 个推
... ...