切换导航条
此项目
正在载入...
登录
allied-creation
/
performance
·
提交
转到一个项目
GitLab
转到群组
项目
活动
文件
提交
管道
0
构建
0
图表
里程碑
问题
0
合并请求
0
成员
标记
维基
派生
网络
创建新的问题
下载为
差异文件
浏览文件
作者
郑周
2 years ago
提交
b4b56edc943f4ca52750c1059a40c7063fb8e661
2 个父辈
12609834
de1b8d5b
Merge branch 'dev-zhengzhou' into test
隐藏空白字符变更
内嵌
并排对比
正在显示
2 个修改的文件
包含
246 行增加
和
94 行删除
pkg/application/node_task/node_task_service.go
pkg/infrastructure/repository/pg_node_task_repository.go
pkg/application/node_task/node_task_service.go
查看文件 @
b4b56ed
...
...
@@ -33,143 +33,295 @@ func (rs *NodeTaskService) SendEvaluationNode() error {
transactionContext
.
RollbackTransaction
()
if
err
:=
recover
();
err
!=
nil
{
log
.
Logger
.
Error
(
application
.
ThrowError
(
application
.
BUSINESS_ERROR
,
fmt
.
Sprintf
(
"定时发送评估任务异常:%s"
,
err
))
.
Error
())
log
.
Logger
.
Error
(
application
.
ThrowError
(
application
.
BUSINESS_ERROR
,
fmt
.
Sprintf
(
"定时发送评估任务
[基础查询]
异常:%s"
,
err
))
.
Error
())
}
}()
taskRepository
:=
factory
.
CreateNodeTaskRepository
(
map
[
string
]
interface
{}{
"transactionContext"
:
transactionContext
})
tasks
,
err
:=
taskRepository
.
Find
(
map
[
string
]
interface
{}{
"
now
"
:
time
.
Now
()
.
Local
()})
tasks
,
err
:=
taskRepository
.
Find
(
map
[
string
]
interface
{}{
"
lessNextSentAt
"
:
time
.
Now
()
.
Local
()})
if
err
!=
nil
{
return
application
.
ThrowError
(
application
.
INTERNAL_SERVER_ERROR
,
err
.
Error
())
}
if
len
(
tasks
)
==
0
{
return
nil
}
projectRepository
:=
factory
.
CreateEvaluationProjectRepository
(
map
[
string
]
interface
{}{
"transactionContext"
:
transactionContext
})
cycleRepository
:=
factory
.
CreateEvaluationCycleRepository
(
map
[
string
]
interface
{}{
"transactionContext"
:
transactionContext
})
projectIdsMap
:=
map
[
int64
]
*
domain
.
EvaluationProject
{}
cycleIdsMap
:=
map
[
int64
]
*
domain
.
EvaluationCycle
{}
projectMap
:=
map
[
int64
]
*
domain
.
EvaluationProject
{}
cycleMap
:=
map
[
int64
]
*
domain
.
EvaluationCycle
{}
for
i
:=
range
tasks
{
task
:=
tasks
[
i
]
projectIdsMap
[
task
.
ProjectId
]
=
nil
cycleIdsMap
[
task
.
CycleId
]
=
nil
projectMap
[
tasks
[
i
]
.
ProjectId
]
=
nil
cycleMap
[
tasks
[
i
]
.
CycleId
]
=
nil
}
projectIds
:=
make
([]
int64
,
0
)
cycleIds
:=
make
([]
int64
,
0
)
for
k
:=
range
project
Ids
Map
{
for
k
:=
range
projectMap
{
projectIds
=
append
(
projectIds
,
k
)
}
for
k
:=
range
cycle
Ids
Map
{
for
k
:=
range
cycleMap
{
cycleIds
=
append
(
cycleIds
,
k
)
}
_
,
projects
,
err
:=
projectRepository
.
Find
(
map
[
string
]
interface
{}{
"ids"
:
projectIds
},
"template"
)
if
err
!=
nil
{
return
application
.
ThrowError
(
application
.
INTERNAL_SERVER_ERROR
,
err
.
Error
())
if
len
(
projectIds
)
>
0
{
projectRepository
:=
factory
.
CreateEvaluationProjectRepository
(
map
[
string
]
interface
{}{
"transactionContext"
:
transactionContext
})
_
,
projects
,
err
:=
projectRepository
.
Find
(
map
[
string
]
interface
{}{
"ids"
:
projectIds
},
"template"
)
if
err
!=
nil
{
return
application
.
ThrowError
(
application
.
INTERNAL_SERVER_ERROR
,
err
.
Error
())
}
for
i
:=
range
projects
{
projectMap
[
projects
[
i
]
.
Id
]
=
projects
[
i
]
}
}
_
,
cycles
,
err
:=
cycleRepository
.
Find
(
map
[
string
]
interface
{}{
"ids"
:
cycleIds
})
if
err
!=
nil
{
return
application
.
ThrowError
(
application
.
INTERNAL_SERVER_ERROR
,
err
.
Error
())
if
len
(
cycleIds
)
>
0
{
cycleRepository
:=
factory
.
CreateEvaluationCycleRepository
(
map
[
string
]
interface
{}{
"transactionContext"
:
transactionContext
})
_
,
cycles
,
err
:=
cycleRepository
.
Find
(
map
[
string
]
interface
{}{
"ids"
:
cycleIds
})
if
err
!=
nil
{
return
application
.
ThrowError
(
application
.
INTERNAL_SERVER_ERROR
,
err
.
Error
())
}
for
i
:=
range
cycles
{
cycleMap
[
cycles
[
i
]
.
Id
]
=
cycles
[
i
]
}
}
if
err
=
transactionContext
.
CommitTransaction
();
err
!=
nil
{
return
application
.
ThrowError
(
application
.
TRANSACTION_ERROR
,
err
.
Error
())
}
for
i
:=
range
projects
{
projectIdsMap
[
projects
[
i
]
.
Id
]
=
projects
[
i
]
// 相同项目节点先聚合
taskMap
:=
map
[
int64
][]
*
domain
.
NodeTask
{}
for
i
:=
range
tasks
{
task
:=
tasks
[
i
]
array
,
ok
:=
taskMap
[
task
.
ProjectId
]
if
!
ok
{
array
=
make
([]
*
domain
.
NodeTask
,
0
)
}
taskMap
[
task
.
ProjectId
]
=
append
(
array
,
task
)
}
for
i
:=
range
cycles
{
cycleIdsMap
[
cycles
[
i
]
.
Id
]
=
cycles
[
i
]
for
k
,
v
:=
range
taskMap
{
project
,
ok
:=
projectMap
[
k
]
if
ok
&&
project
!=
nil
{
if
err
=
rs
.
taskSend
(
project
,
v
,
cycleMap
);
err
!=
nil
{
return
err
}
}
else
{
if
err
=
rs
.
taskAbort
(
v
);
err
!=
nil
{
return
err
}
}
}
staffAssessService
:=
service
.
NewStaffAssessServeice
()
//staffAssessService := service.NewStaffAssessServeice()
//now := time.Now().Local()
//for i := range tasks {
// task := tasks[i]
// project, ok := projectMap[task.ProjectId] // 项目还存在
// if ok && project != nil {
// // 环节截止时间
// maxTime := task.TimeEnd.Local()
//
// // 更新任务最后一次的发送时间(取当前时间)
// task.LastSentAt = &now
//
// // 当前周起始时间和截止时间
// var cycleTimeStart = task.NextSentAt.Local()
// var cycleTimeEnd time.Time
//
// // 下个周期起始时间
// nextTime := utils.NextTimeInc(cycleTimeStart, task.KpiCycle)
// // 超过截止时间
// if nextTime.After(maxTime) {
// task.NextSentAt = nil
// } else {
// task.NextSentAt = &nextTime
// }
//
// // 周期的截至时间=下一个周期的开始时间-1秒(需求方要求提交数据时间延长到第二天8点30分截止)
// if task.NextSentAt == nil {
// //cycleTimeEnd = maxTime
// maxYear, maxMonth, maxDay := maxTime.Date()
// cycleTimeEnd = time.Date(maxYear, maxMonth, maxDay, 0, 0, 0, 0, time.Local)
// cycleTimeEnd = cycleTimeEnd.Add(24*time.Hour + 8*time.Hour + 30*time.Minute) // 注.延长8.5小时
// } else {
// //cycleTimeEnd = task.NextSentAt.Local().Add(-1 * time.Second) // 周期截至时间=下一个周期起始时间-1秒
// cycleTimeEnd = task.NextSentAt.Local().Add(8*time.Hour + 30*time.Minute) // 注.延长8.5小时
// }
//
// // 格式化周期的起始和截止时间
// fmCycleStartTime := cycleTimeStart.Format("2006-01-02 15:04:05")
// fmCycleTimeEnd := cycleTimeEnd.Format("2006-01-02 15:04:05")
//
// csat := &command.CreateStaffAssessTask{
// CompanyId: int(project.CompanyId),
// EvaluationProjectId: int(project.Id),
// EvaluationProjectName: project.Name,
// CycleId: project.CycleId,
// StepList: make([]command.AssessTaskStep, 0),
// }
//
// // 周期名称
// if cycle, ok := cycleMap[project.CycleId]; ok {
// csat.CycleName = cycle.Name
// }
//
// // 接收人
// csat.ExecutorId = make([]int, 0)
// for rIndex := range project.Recipients {
// vInt, _ := strconv.Atoi(project.Recipients[rIndex])
// csat.ExecutorId = append(csat.ExecutorId, vInt)
// }
//
// csat.BeginTime = fmCycleStartTime
// csat.EndTime = fmCycleTimeEnd
// csat.StepList = append(csat.StepList, command.AssessTaskStep{
// SortBy: task.NodeSort,
// LinkNodeId: int(task.NodeId),
// LinkNodeName: task.NodeName,
// LinkNodeType: task.NodeType,
// BeginTime: fmCycleStartTime,
// EndTime: fmCycleTimeEnd,
// })
//
// // 创建发送任务
// _, err := staffAssessService.CreateStaffAssessTask(transactionContext, csat)
// if err != nil {
// return application.ThrowError(application.INTERNAL_SERVER_ERROR, "创建发送任务"+err.Error())
// }
// } else {
// task.NextSentAt = nil // 项目不存在,取消周期任务发送
// }
//
// task, err := taskRepository.Insert(task)
// if err != nil {
// return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
// }
//}
return
nil
}
func
(
rs
*
NodeTaskService
)
taskSend
(
project
*
domain
.
EvaluationProject
,
tasks
[]
*
domain
.
NodeTask
,
cycleMap
map
[
int64
]
*
domain
.
EvaluationCycle
)
error
{
transactionContext
,
err
:=
factory
.
StartTransaction
()
if
err
!=
nil
{
return
err
}
defer
func
()
{
_
=
transactionContext
.
RollbackTransaction
()
if
err
:=
recover
();
err
!=
nil
{
log
.
Logger
.
Error
(
application
.
ThrowError
(
application
.
BUSINESS_ERROR
,
fmt
.
Sprintf
(
"定时发送评估任务异常:%s"
,
err
))
.
Error
())
}
}()
taskRepository
:=
factory
.
CreateNodeTaskRepository
(
map
[
string
]
interface
{}{
"transactionContext"
:
transactionContext
})
now
:=
time
.
Now
()
.
Local
()
csat
:=
&
command
.
CreateStaffAssessTask
{
CompanyId
:
int
(
project
.
CompanyId
),
EvaluationProjectId
:
int
(
project
.
Id
),
EvaluationProjectName
:
project
.
Name
,
CycleId
:
project
.
CycleId
,
StepList
:
make
([]
command
.
AssessTaskStep
,
0
),
}
// 周期名称
if
cycle
,
ok
:=
cycleMap
[
project
.
CycleId
];
ok
{
csat
.
CycleName
=
cycle
.
Name
}
// 接收人
csat
.
ExecutorId
=
make
([]
int
,
0
)
for
rIndex
:=
range
project
.
Recipients
{
vInt
,
_
:=
strconv
.
Atoi
(
project
.
Recipients
[
rIndex
])
csat
.
ExecutorId
=
append
(
csat
.
ExecutorId
,
vInt
)
}
for
i
:=
range
tasks
{
task
:=
tasks
[
i
]
project
,
ok
:=
projectIdsMap
[
task
.
ProjectId
]
// 项目
if
ok
&&
project
!=
nil
{
// 环节截止时间
maxTime
:=
task
.
TimeEnd
.
Local
()
// 更新任务最后一次的发送时间(取当前时间)
task
.
LastSentAt
=
&
now
// 当前周起始时间和截止时间
var
cycleTimeStart
=
task
.
NextSentAt
.
Local
()
var
cycleTimeEnd
time
.
Time
// 下个周期起始时间
nextTime
:=
utils
.
NextTimeInc
(
cycleTimeStart
,
task
.
KpiCycle
)
// 超过截止时间
if
nextTime
.
After
(
maxTime
)
{
task
.
NextSentAt
=
nil
}
else
{
task
.
NextSentAt
=
&
nextTime
}
// 周期的截至时间=下一个周期的开始时间-1秒(需求方要求提交数据时间延长到第二天8点30分截止)
if
task
.
NextSentAt
==
nil
{
//cycleTimeEnd = maxTime
maxYear
,
maxMonth
,
maxDay
:=
maxTime
.
Date
()
cycleTimeEnd
=
time
.
Date
(
maxYear
,
maxMonth
,
maxDay
,
0
,
0
,
0
,
0
,
time
.
Local
)
cycleTimeEnd
=
cycleTimeEnd
.
Add
(
24
*
time
.
Hour
+
8
*
time
.
Hour
+
30
*
time
.
Minute
)
// 注.延长8.5小时
}
else
{
//cycleTimeEnd = task.NextSentAt.Local().Add(-1 * time.Second) // 周期截至时间=下一个周期起始时间-1秒
cycleTimeEnd
=
task
.
NextSentAt
.
Local
()
.
Add
(
8
*
time
.
Hour
+
30
*
time
.
Minute
)
// 注.延长8.5小时
}
// 环节截止时间
maxTime
:=
task
.
TimeEnd
.
Local
()
// 格式化周期的起始和截止时间
fmCycleStartTime
:=
cycleTimeStart
.
Format
(
"2006-01-02 15:04:05"
)
fmCycleTimeEnd
:=
cycleTimeEnd
.
Format
(
"2006-01-02 15:04:05"
)
// 更新任务最后一次的发送时间(取当前时间)
task
.
LastSentAt
=
&
now
csat
:=
&
command
.
CreateStaffAssessTask
{
CompanyId
:
int
(
project
.
CompanyId
),
EvaluationProjectId
:
int
(
project
.
Id
),
EvaluationProjectName
:
project
.
Name
,
CycleId
:
project
.
CycleId
,
StepList
:
make
([]
command
.
AssessTaskStep
,
0
),
}
// 当前小周期范围[起始时间-截止时间]
var
cycleTimeStart
=
task
.
NextSentAt
.
Local
()
var
cycleTimeEnd
time
.
Time
// 周期名称
if
cycle
,
ok
:=
cycleIdsMap
[
project
.
CycleId
];
ok
{
csat
.
CycleName
=
cycle
.
Name
}
// 下个周期起始时间
nextTime
:=
utils
.
NextTimeInc
(
cycleTimeStart
,
task
.
KpiCycle
)
// 超过截止时间
if
nextTime
.
After
(
maxTime
)
{
task
.
NextSentAt
=
nil
}
else
{
task
.
NextSentAt
=
&
nextTime
}
// 更新下个周期
_
,
err
=
taskRepository
.
Insert
(
task
)
if
err
!=
nil
{
return
application
.
ThrowError
(
application
.
INTERNAL_SERVER_ERROR
,
err
.
Error
())
}
// 接收人
csat
.
ExecutorId
=
make
([]
int
,
0
)
for
rIndex
:=
range
project
.
Recipients
{
vInt
,
_
:=
strconv
.
Atoi
(
project
.
Recipients
[
rIndex
])
csat
.
ExecutorId
=
append
(
csat
.
ExecutorId
,
vInt
)
}
// 周期的截至时间=下一个周期的开始时间-1秒(需求方要求:提交数据时间延长到第二天8点30分截止)
if
task
.
NextSentAt
==
nil
{
//cycleTimeEnd = maxTime
maxYear
,
maxMonth
,
maxDay
:=
maxTime
.
Date
()
cycleTimeEnd
=
time
.
Date
(
maxYear
,
maxMonth
,
maxDay
,
0
,
0
,
0
,
0
,
time
.
Local
)
cycleTimeEnd
=
cycleTimeEnd
.
Add
(
24
*
time
.
Hour
+
8
*
time
.
Hour
+
30
*
time
.
Minute
)
// 注.延长8.5小时
}
else
{
//cycleTimeEnd = task.NextSentAt.Local().Add(-1 * time.Second) // 周期截至时间=下一个周期起始时间-1秒
cycleTimeEnd
=
task
.
NextSentAt
.
Local
()
.
Add
(
8
*
time
.
Hour
+
30
*
time
.
Minute
)
// 注.延长8.5小时
}
// 格式化周期的起始和截止时间
fmCycleStartTime
:=
cycleTimeStart
.
Format
(
"2006-01-02 15:04:05"
)
fmCycleTimeEnd
:=
cycleTimeEnd
.
Format
(
"2006-01-02 15:04:05"
)
// 格式化周期的起始和截止时间
if
len
(
csat
.
BeginTime
)
==
0
{
csat
.
BeginTime
=
fmCycleStartTime
}
if
len
(
csat
.
EndTime
)
==
0
{
csat
.
EndTime
=
fmCycleTimeEnd
csat
.
StepList
=
append
(
csat
.
StepList
,
command
.
AssessTaskStep
{
SortBy
:
task
.
NodeSort
,
LinkNodeId
:
int
(
task
.
NodeId
),
LinkNodeName
:
task
.
NodeName
,
LinkNodeType
:
task
.
NodeType
,
BeginTime
:
fmCycleStartTime
,
EndTime
:
fmCycleTimeEnd
,
})
// 创建发送任务
_
,
err
:=
staffAssessService
.
CreateStaffAssessTask
(
transactionContext
,
csat
)
if
err
!=
nil
{
return
application
.
ThrowError
(
application
.
INTERNAL_SERVER_ERROR
,
"创建发送任务"
+
err
.
Error
())
}
}
else
{
task
.
NextSentAt
=
nil
// 项目不存在,取消周期任务发送
}
csat
.
StepList
=
append
(
csat
.
StepList
,
command
.
AssessTaskStep
{
SortBy
:
task
.
NodeSort
,
LinkNodeId
:
int
(
task
.
NodeId
),
LinkNodeName
:
task
.
NodeName
,
LinkNodeType
:
task
.
NodeType
,
BeginTime
:
fmCycleStartTime
,
EndTime
:
fmCycleTimeEnd
,
})
}
task
,
err
:=
taskRepository
.
Insert
(
task
)
if
err
!=
nil
{
return
application
.
ThrowError
(
application
.
INTERNAL_SERVER_ERROR
,
err
.
Error
())
}
// 创建发送任务
_
,
err
=
service
.
NewStaffAssessServeice
()
.
CreateStaffAssessTask
(
transactionContext
,
csat
)
if
err
!=
nil
{
return
application
.
ThrowError
(
application
.
INTERNAL_SERVER_ERROR
,
"创建发送任务"
+
err
.
Error
())
}
if
err
:
=
transactionContext
.
CommitTransaction
();
err
!=
nil
{
if
err
=
transactionContext
.
CommitTransaction
();
err
!=
nil
{
return
application
.
ThrowError
(
application
.
TRANSACTION_ERROR
,
err
.
Error
())
}
return
nil
}
// 节点任务中止
func
(
rs
*
NodeTaskService
)
taskAbort
(
tasks
[]
*
domain
.
NodeTask
)
error
{
transactionContext
,
err
:=
factory
.
StartTransaction
()
if
err
!=
nil
{
return
err
}
defer
func
()
{
transactionContext
.
RollbackTransaction
()
}()
taskRepository
:=
factory
.
CreateNodeTaskRepository
(
map
[
string
]
interface
{}{
"transactionContext"
:
transactionContext
})
for
i
:=
range
tasks
{
task
:=
tasks
[
i
]
task
.
NextSentAt
=
nil
// 项目不存在,取消周期任务发送
_
,
err
=
taskRepository
.
Insert
(
task
)
if
err
!=
nil
{
return
application
.
ThrowError
(
application
.
INTERNAL_SERVER_ERROR
,
err
.
Error
())
}
}
if
err
=
transactionContext
.
CommitTransaction
();
err
!=
nil
{
return
application
.
ThrowError
(
application
.
TRANSACTION_ERROR
,
err
.
Error
())
}
return
nil
}
...
...
pkg/infrastructure/repository/pg_node_task_repository.go
查看文件 @
b4b56ed
...
...
@@ -131,7 +131,7 @@ func (repo *NodeTaskRepository) Find(queryOptions map[string]interface{}) ([]*do
var
m
[]
*
models
.
NodeTask
query
:=
tx
.
Model
(
&
m
)
.
Where
(
"deleted_at isnull"
)
if
v
,
ok
:=
queryOptions
[
"
now
"
]
.
(
time
.
Time
);
ok
{
if
v
,
ok
:=
queryOptions
[
"
lessNextSentAt
"
]
.
(
time
.
Time
);
ok
{
query
.
Where
(
"next_sent_at <= ?"
,
v
)
}
...
...
请
注册
或
登录
后发表评论