作者 yangfu

refactor: 车间统计增加调试日志,优化统计

@@ -291,12 +291,6 @@ func (productRecordService *ProductRecordService) ProductRecordStatics(cmd *comm @@ -291,12 +291,6 @@ func (productRecordService *ProductRecordService) ProductRecordStatics(cmd *comm
291 }() 291 }()
292 var _ domain.ProductRecordRepository 292 var _ domain.ProductRecordRepository
293 var productRecord *domain.ProductRecord = cmd.ProductRecord 293 var productRecord *domain.ProductRecord = cmd.ProductRecord
294 - //_,productRecord,err = factory.FastPgProductRecord(transactionContext,cmd.ProductRecordId)  
295 - //if err!=nil{  
296 - // log.Logger.Error(err.Error())  
297 - // return nil, nil  
298 - //}  
299 - //  
300 if productRecord == nil { 294 if productRecord == nil {
301 return nil, nil 295 return nil, nil
302 } 296 }
1 package domainService 1 package domainService
2 2
3 import ( 3 import (
  4 + "fmt"
4 "github.com/hibiken/asynq" 5 "github.com/hibiken/asynq"
5 "github.com/linmadan/egglib-go/utils/json" 6 "github.com/linmadan/egglib-go/utils/json"
6 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant" 7 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
7 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/domain" 8 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/domain"
8 ) 9 )
9 10
10 -func SendWorkshopWorkTimeStaticJob(productRecord *domain.ProductAttendanceRecord) error {  
11 - return SendAsyncJob(domain.TaskKeyWorkshopWorkTimeRecordStatics(), productRecord) 11 +const (
  12 + QueueProduct = "product"
  13 + QueueDevice = "device"
  14 + QueueDefault = "default"
  15 +)
  16 +
  17 +func FormatQueue(qt string) string {
  18 + return fmt.Sprintf("%v:queue:%v", constant.CACHE_PREFIX, qt)
12 } 19 }
13 20
14 -func SendProductRecordStaticsJob(productRecord *domain.ProductRecord) error {  
15 - task := asynq.NewTask(domain.TaskKeyPatternProductRecordStatics(), []byte(json.MarshalToString(productRecord))) 21 +func SendWorkshopWorkTimeStaticJob(r *domain.ProductAttendanceRecord) error {
  22 + return SendAsyncJob(domain.TaskKeyWorkshopWorkTimeRecordStatics(), r, asynq.Queue(FormatQueue(QueueDefault)))
  23 +}
16 24
17 - client := asynq.NewClient(asynq.RedisClientOpt{Addr: constant.REDIS_ADDRESS})  
18 - _, err := client.Enqueue(task)  
19 - return err 25 +func SendProductRecordStaticsJob(r *domain.ProductRecord) error {
  26 + return SendAsyncJob(domain.TaskKeyPatternProductRecordStatics(), r, asynq.Queue(FormatQueue(QueueProduct)))
20 } 27 }
21 28
22 -func SendDeviceZkTecoReportJob(productRecord *domain.DeviceZkTeco) error {  
23 - return SendAsyncJob(domain.TaskDeviceZkTecoReport(), productRecord) 29 +func SendDeviceZkTecoReportJob(r *domain.DeviceZkTeco) error {
  30 + return SendAsyncJob(domain.TaskDeviceZkTecoReport(), r, asynq.Queue(FormatQueue(QueueDefault)))
24 } 31 }
25 32
26 -func SendWorkshopDeviceData(productRecord *domain.DeviceCollection) error {  
27 - return SendAsyncJob(domain.TaskDeviceCollection(), productRecord, asynq.Queue("device")) 33 +func SendWorkshopDeviceData(r *domain.DeviceCollection) error {
  34 + return SendAsyncJob(domain.TaskDeviceCollection(), r, asynq.Queue(FormatQueue(QueueDevice)))
28 } 35 }
29 36
30 func SendAsyncJob(queueName string, job interface{}, opts ...asynq.Option) error { 37 func SendAsyncJob(queueName string, job interface{}, opts ...asynq.Option) error {
@@ -27,8 +27,6 @@ func (ptr *PGProductRecordService) EmployeeProductStatics(productRecord *domain. @@ -27,8 +27,6 @@ func (ptr *PGProductRecordService) EmployeeProductStatics(productRecord *domain.
27 var ( 27 var (
28 workshopRepository, _ = repository.NewWorkshopRepository(ptr.transactionContext) 28 workshopRepository, _ = repository.NewWorkshopRepository(ptr.transactionContext)
29 productPlanRepository, _ = repository.NewProductPlanRepository(ptr.transactionContext) 29 productPlanRepository, _ = repository.NewProductPlanRepository(ptr.transactionContext)
30 - productGroupRepository, _ = repository.NewProductGroupRepository(ptr.transactionContext)  
31 - //employeeProductRecordRepository, _ = repository.NewEmployeeProductRecordRepository(ptr.transactionContext)  
32 ) 30 )
33 31
34 var ( 32 var (
@@ -60,9 +58,9 @@ func (ptr *PGProductRecordService) EmployeeProductStatics(productRecord *domain. @@ -60,9 +58,9 @@ func (ptr *PGProductRecordService) EmployeeProductStatics(productRecord *domain.
60 case ProductSection3: 58 case ProductSection3:
61 break 59 break
62 case ProductSection4: //个人特殊处理 60 case ProductSection4: //个人特殊处理
63 - return ptr.personalProductStatics(nil, nil, nil, productRecord) 61 + return ptr.personalProductStatics(nil, productRecord)
64 default: 62 default:
65 - return nil, nil //ptr.personalProductStatics(productRecord) 63 + return nil, nil
66 } 64 }
67 if planId == 0 { 65 if planId == 0 {
68 log.Logger.Debug(fmt.Sprintf("工段:%v product_record 编号:%v 批次为0", productRecord.WorkStation.WorkStationId, productRecord.ProductRecordId)) 66 log.Logger.Debug(fmt.Sprintf("工段:%v product_record 编号:%v 批次为0", productRecord.WorkStation.WorkStationId, productRecord.ProductRecordId))
@@ -74,9 +72,6 @@ func (ptr *PGProductRecordService) EmployeeProductStatics(productRecord *domain. @@ -74,9 +72,6 @@ func (ptr *PGProductRecordService) EmployeeProductStatics(productRecord *domain.
74 return nil, err 72 return nil, err
75 } 73 }
76 74
77 - // 2.1 判断是否是支援类型 有打卡记录,员工是否是属于该工段的员工  
78 - groupMembers, groupMembersKeyFunc := FindGroupMembers(productGroupRepository, cid, oid, productRecord.WorkStation.WorkStationId)  
79 -  
80 // 集体 75 // 集体
81 // 1.查询员工 -》 员工打卡记录 工位+打卡日期 76 // 1.查询员工 -》 员工打卡记录 工位+打卡日期
82 // 2.打卡记录的时间区间 在生产记录上报的时间范围内 77 // 2.打卡记录的时间区间 在生产记录上报的时间范围内
@@ -91,8 +86,8 @@ func (ptr *PGProductRecordService) EmployeeProductStatics(productRecord *domain. @@ -91,8 +86,8 @@ func (ptr *PGProductRecordService) EmployeeProductStatics(productRecord *domain.
91 productRecord.ProductWorker = r.ProductWorker 86 productRecord.ProductWorker = r.ProductWorker
92 // 3.查询员工产能记录 -》员工 批次+工位+批次生产日期 (有 更新产能数据、没有插入一条产能数据) 87 // 3.查询员工产能记录 -》员工 批次+工位+批次生产日期 (有 更新产能数据、没有插入一条产能数据)
93 // 4.更新产能 (产能、二级品) (特殊工段处理 打料、成型) 88 // 4.更新产能 (产能、二级品) (特殊工段处理 打料、成型)
94 - // 个人  
95 - if _, err := ptr.personalProductStatics(productPlan, groupMembers, groupMembersKeyFunc, productRecord); err != nil { 89 + // 个人产能统计
  90 + if _, err := ptr.personalProductStatics(productPlan, productRecord); err != nil {
96 return nil, err 91 return nil, err
97 } 92 }
98 } 93 }
@@ -174,9 +169,8 @@ func FindGroupMembers(productGroupRepository domain.ProductGroupRepository, comp @@ -174,9 +169,8 @@ func FindGroupMembers(productGroupRepository domain.ProductGroupRepository, comp
174 } 169 }
175 170
176 // 个人生产记录统计 171 // 个人生产记录统计
177 -func (ptr *PGProductRecordService) personalProductStatics(productPlan *domain.ProductPlan, groupMembers map[string]*domain.User, groupMembersKeyFunc func(int) string, productRecord *domain.ProductRecord) (interface{}, error) { 172 +func (ptr *PGProductRecordService) personalProductStatics(productPlan *domain.ProductPlan, productRecord *domain.ProductRecord) (interface{}, error) {
178 var ( 173 var (
179 - //workshopRepository,_=repository.NewWorkshopRepository(ptr.transactionContext)  
180 productPlanRepository, _ = repository.NewProductPlanRepository(ptr.transactionContext) 174 productPlanRepository, _ = repository.NewProductPlanRepository(ptr.transactionContext)
181 productGroupRepository, _ = repository.NewProductGroupRepository(ptr.transactionContext) 175 productGroupRepository, _ = repository.NewProductGroupRepository(ptr.transactionContext)
182 employeeProductRecordRepository, _ = repository.NewEmployeeProductRecordRepository(ptr.transactionContext) 176 employeeProductRecordRepository, _ = repository.NewEmployeeProductRecordRepository(ptr.transactionContext)
@@ -185,20 +179,18 @@ func (ptr *PGProductRecordService) personalProductStatics(productPlan *domain.Pr @@ -185,20 +179,18 @@ func (ptr *PGProductRecordService) personalProductStatics(productPlan *domain.Pr
185 cid = productRecord.CompanyId 179 cid = productRecord.CompanyId
186 oid = productRecord.OrgId 180 oid = productRecord.OrgId
187 planId = productRecord.ProductRecordInfo.ProductPlanId 181 planId = productRecord.ProductRecordInfo.ProductPlanId
188 - //productPlan *domain.ProductPlan  
189 err error 182 err error
  183 + workStationId = productRecord.WorkStation.WorkStationId
190 ) 184 )
191 // 2.1 判断是否是支援类型 有打卡记录,员工是否是属于该工段的员工 185 // 2.1 判断是否是支援类型 有打卡记录,员工是否是属于该工段的员工
192 - if groupMembers == nil {  
193 - groupMembers, groupMembersKeyFunc = FindGroupMembers(productGroupRepository, cid, oid, productRecord.WorkStation.WorkStationId)  
194 - } 186 + groupMembers, groupMembersKeyFunc := FindGroupMembers(productGroupRepository, cid, oid, workStationId)
  187 +
195 if productPlan == nil { 188 if productPlan == nil {
196 productPlan, err = productPlanRepository.FindOne(map[string]interface{}{"productPlanId": planId}) 189 productPlan, err = productPlanRepository.FindOne(map[string]interface{}{"productPlanId": planId})
197 if err != nil { 190 if err != nil {
198 return nil, err 191 return nil, err
199 } 192 }
200 } 193 }
201 - workStationId := productRecord.WorkStation.WorkStationId  
202 194
203 employeeProductRecordDao, _ := dao.NewEmployeeProductRecordDao(ptr.transactionContext) 195 employeeProductRecordDao, _ := dao.NewEmployeeProductRecordDao(ptr.transactionContext)
204 196
@@ -231,11 +223,13 @@ func (ptr *PGProductRecordService) personalProductStatics(productPlan *domain.Pr @@ -231,11 +223,13 @@ func (ptr *PGProductRecordService) personalProductStatics(productPlan *domain.Pr
231 } 223 }
232 } 224 }
233 } 225 }
  226 + log.Logger.Debug(fmt.Sprintf("产能统计 员工:%v(%v) 当前产能:%v kg 新增产能:%v kg 原始数量:%v",
  227 + employeeProductRecord.ProductWorker.UserName, employeeProductRecord.ProductWorker.UserId,
  228 + employeeProductRecord.ProductWeigh, productRecord.ProductRecordInfo.Weigh, productRecord.ProductRecordInfo.Original))
234 229
235 employeeProductRecord.UpdateProductWeigh(productRecord, yesterdayOutputWeight, bestOutputWeight) 230 employeeProductRecord.UpdateProductWeigh(productRecord, yesterdayOutputWeight, bestOutputWeight)
236 231
237 if employeeProductRecord, err = employeeProductRecordRepository.Save(employeeProductRecord); err != nil { 232 if employeeProductRecord, err = employeeProductRecordRepository.Save(employeeProductRecord); err != nil {
238 - // TODO:异常处理  
239 log.Logger.Error(fmt.Sprintf("生产记录:[%v] 员工:[%v] 处理异常:%v", productRecord.ProductRecordId, productRecord.ProductWorker.UserId, err.Error())) 233 log.Logger.Error(fmt.Sprintf("生产记录:[%v] 员工:[%v] 处理异常:%v", productRecord.ProductRecordId, productRecord.ProductWorker.UserId, err.Error()))
240 } 234 }
241 return nil, nil 235 return nil, nil
@@ -246,7 +240,6 @@ func (ptr *PGProductRecordService) WorkshopProductStatics(productRecord *domain. @@ -246,7 +240,6 @@ func (ptr *PGProductRecordService) WorkshopProductStatics(productRecord *domain.
246 var ( 240 var (
247 workshopRepository, _ = repository.NewWorkshopRepository(ptr.transactionContext) 241 workshopRepository, _ = repository.NewWorkshopRepository(ptr.transactionContext)
248 productPlanRepository, _ = repository.NewProductPlanRepository(ptr.transactionContext) 242 productPlanRepository, _ = repository.NewProductPlanRepository(ptr.transactionContext)
249 - //productGroupRepository, _ = repository.NewProductGroupRepository(ptr.transactionContext)  
250 workshopProductRecordRepository, _ = repository.NewWorkshopProductRecordRepository(ptr.transactionContext) 243 workshopProductRecordRepository, _ = repository.NewWorkshopProductRecordRepository(ptr.transactionContext)
251 ) 244 )
252 245
@@ -295,6 +288,9 @@ func (ptr *PGProductRecordService) WorkshopProductStatics(productRecord *domain. @@ -295,6 +288,9 @@ func (ptr *PGProductRecordService) WorkshopProductStatics(productRecord *domain.
295 return nil, nil 288 return nil, nil
296 } 289 }
297 } 290 }
  291 + log.Logger.Debug(fmt.Sprintf("产能统计 工位:%v(%v) 当前产能:%v kg 新增产能:%v kg 原始数量:%v",
  292 + workshopProductRecord.WorkStation.WorkStationId, workshopProductRecord.WorkStation.SectionName,
  293 + workshopProductRecord.ProductWeigh, productRecord.ProductRecordInfo.Weigh, productRecord.ProductRecordInfo.Original))
298 workshopProductRecord.UpdateProductWeigh(productRecord) 294 workshopProductRecord.UpdateProductWeigh(productRecord)
299 // 打料 跟 成型工段的初始产能是批次的产能 295 // 打料 跟 成型工段的初始产能是批次的产能
300 if productRecord.WorkStation.SectionName == ProductSection1 && productRecord.WorkStation.SectionName == ProductSection2 { 296 if productRecord.WorkStation.SectionName == ProductSection1 && productRecord.WorkStation.SectionName == ProductSection2 {
@@ -96,10 +96,11 @@ func (ptr *PGWorkshopDataConsumeService) Consume(companyId, orgId int, record *d @@ -96,10 +96,11 @@ func (ptr *PGWorkshopDataConsumeService) Consume(companyId, orgId int, record *d
96 if record.DeviceType == domain.DeviceTypeChuanChuanJi && plan != nil && deviceRunningData.Count > 0 { 96 if record.DeviceType == domain.DeviceTypeChuanChuanJi && plan != nil && deviceRunningData.Count > 0 {
97 97
98 productRecord, _ := ptr.newProductRecord(companyId, orgId, workStation, device, deviceRunningData, plan) 98 productRecord, _ := ptr.newProductRecord(companyId, orgId, workStation, device, deviceRunningData, plan)
99 - //if _, err = deviceRunningRecordRepository.Save(deviceRunningRecord); err != nil {  
100 - // return nil, err  
101 - //}  
102 - SendProductRecordStaticsJob(productRecord) 99 + //SendProductRecordStaticsJob(productRecord)
  100 +
  101 + productRecordService, _ := NewPGProductRecordService(ptr.transactionContext)
  102 + productRecordService.EmployeeProductStatics(productRecord)
  103 + productRecordService.WorkshopProductStatics(productRecord)
103 } 104 }
104 105
105 // 3.更新 设备每日运行记录(汇总) - redis更新 十分钟异步刷库 106 // 3.更新 设备每日运行记录(汇总) - redis更新 十分钟异步刷库
@@ -5,6 +5,7 @@ import ( @@ -5,6 +5,7 @@ import (
5 "github.com/hibiken/asynq" 5 "github.com/hibiken/asynq"
6 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant" 6 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
7 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/domain" 7 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/domain"
  8 + "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/domainService"
8 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/log" 9 "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/log"
9 "os" 10 "os"
10 "os/signal" 11 "os/signal"
@@ -20,12 +21,15 @@ func Run() { @@ -20,12 +21,15 @@ func Run() {
20 srv := asynq.NewServer( 21 srv := asynq.NewServer(
21 asynq.RedisClientOpt{Addr: constant.REDIS_ADDRESS}, 22 asynq.RedisClientOpt{Addr: constant.REDIS_ADDRESS},
22 asynq.Config{ 23 asynq.Config{
23 - Concurrency: 2, 24 + Concurrency: 4,
24 Queues: map[string]int{ 25 Queues: map[string]int{
25 //"critical": 1, 26 //"critical": 1,
26 "default": 1, 27 "default": 1,
27 - "device": 1, 28 + domainService.FormatQueue(domainService.QueueDevice): 1,
  29 + domainService.FormatQueue(domainService.QueueProduct): 1,
  30 + domainService.FormatQueue(domainService.QueueDefault): 1,
28 }, 31 },
  32 + StrictPriority: true,
29 }, 33 },
30 ) 34 )
31 35
@@ -17,8 +17,8 @@ func HandlerProductRecordStatics(c context.Context, t *asynq.Task) error { @@ -17,8 +17,8 @@ func HandlerProductRecordStatics(c context.Context, t *asynq.Task) error {
17 if err := json.Unmarshal(t.Payload(), cmd); err != nil { 17 if err := json.Unmarshal(t.Payload(), cmd); err != nil {
18 return err 18 return err
19 } 19 }
20 - log.Logger.Debug(fmt.Sprintf("【生产记录统计】 消费 生产记录ID:%v 类型:%v 工段:%v(%v) 重量:%v", cmd.ProductRecordId, cmd.ProductRecordType,  
21 - cmd.WorkStation.SectionName, cmd.WorkStation.WorkStationId, cmd.ProductRecordInfo.Weigh)) 20 + log.Logger.Debug(fmt.Sprintf("【生产记录统计】 消费 生产记录ID:%v 类型:%v 工段:%v(%v) 重量:%v 记录时间:%v", cmd.ProductRecordId, cmd.ProductRecordType,
  21 + cmd.WorkStation.SectionName, cmd.WorkStation.WorkStationId, cmd.ProductRecordInfo.Weigh, cmd.CreatedAt))
22 _, err := productPlanService.ProductRecordStatics(cmd) 22 _, err := productPlanService.ProductRecordStatics(cmd)
23 if err != nil { 23 if err != nil {
24 log.Logger.Error(err.Error()) 24 log.Logger.Error(err.Error())