作者 yangfu

feat:1.增加金蝶生产计划定时同步

... ... @@ -54,6 +54,9 @@ func (crontabService *CrontabService) initTask() {
autoTodayWorkshopPlanCompletionRecord := task.NewTask("定时刷新当天车间计划完成纪录", "0 0 1-23/3 * * *", AutoTodayWorkshopPlanCompletionRecord) // 1:00, 4:00, 每三个小时运行一次
task.AddTask("autoTodayWorkshopPlanCompletionRecord", autoTodayWorkshopPlanCompletionRecord)
syncProductPlan := task.NewTask("定时同步车间计划", "0 */5 * * * *", SyncProductPlan)
task.AddTask("SyncProductPlan", syncProductPlan)
}
func (crontabService *CrontabService) StartCrontabTask() {
... ...
package crontab
import (
"context"
"fmt"
"github.com/linmadan/egglib-go/transaction/pg"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/application/factory"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/application/syncdata"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/log"
)
func SyncProductPlan(ctx context.Context) error {
defer func() {
if r := recover(); r != nil {
log.Logger.Error(fmt.Sprintf("%v", r))
}
}()
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return err
}
if err := transactionContext.StartTransaction(); err != nil {
return err
}
defer func() {
if err != nil {
log.Logger.Error("【定时同步车间计划】 失败:" + err.Error())
}
transactionContext.RollbackTransaction()
}()
log.Logger.Debug("【定时同步车间计划】 启动")
pullK3CloudService := syncdata.PullDataK3CloudService{}
if err := pullK3CloudService.SyncDataProductPlan(transactionContext.(*pg.TransactionContext)); err != nil {
log.Logger.Error(err.Error())
return nil
}
if err = transactionContext.CommitTransaction(); err != nil {
return err
}
return nil
}
... ...
... ... @@ -2,7 +2,9 @@ package syncdata
import (
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/domain"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/domainService"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/redis"
"strconv"
"strings"
"time"
... ... @@ -437,19 +439,110 @@ func (srv *PullDataK3CloudService) PullPrdMo(timeFilter time.Time) error {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
//var userService = domainService.NewUserService()
//org, err := userService.Organization(constant.MANUFACTURE_DEFAULT_ORGID)
//if err != nil {
// return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
//}
//同步ProductPlan 表数据
//err = prdMoDao.SyncDataProductPlan(version, constant.MANUFACTURE_DEFAULT_COMPANYID, constant.MANUFACTURE_DEFAULT_ORGID, org.OrgName)
//if err != nil {
// return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
//}
if err = transactionContext.CommitTransaction(); err != nil {
return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
return nil
}
// 同步生产计划
func (srv *PullDataK3CloudService) SyncDataProductPlan(ptr *pgTransaction.TransactionContext) error {
/*
1.获取更新时间
2.获取prd_mo_k3cloud从更新时间开始的有变化的数据
3.查询是否有重复的批次号
4.有进行更新,其他的插入
*/
lastTime, err := redis.GetLastK3CloudFetchTime()
if err != nil {
return err
}
prdMoK3cloudDao, _ := dao.NewPrdMoK3cloudDao(ptr)
records, err := prdMoK3cloudDao.GetLatestData(lastTime)
if err != nil {
return err
}
var userService = domainService.NewUserService()
org, err := userService.Organization(constant.MANUFACTURE_DEFAULT_ORGID)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
var (
cid = constant.MANUFACTURE_DEFAULT_COMPANYID
oid = constant.MANUFACTURE_DEFAULT_ORGID
workshop *domain.Workshop
exists bool
)
//同步ProductPlan 表数据
err = prdMoDao.SyncDataProductPlan(version, constant.MANUFACTURE_DEFAULT_COMPANYID, constant.MANUFACTURE_DEFAULT_ORGID, org.OrgName)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
workshops, _ := factory.FastPgWorkshops(ptr, cid)
productPlanRepository, _, _ := factory.FastPgProductPlan(ptr, 0)
productRepository, _, _ := factory.FastPgProduct(ptr, 0)
for _, v := range records {
if workshop, exists = workshops.ExistsWorkshops(v.WorkShopName); !exists {
continue
}
var plan *domain.ProductPlan
var product *domain.Product
// 产品信息
product, err = productRepository.FindOne(map[string]interface{}{"companyId": cid, "orgId": oid, "productCode": v.MaterialNumber})
if err != nil || err == domain.ErrorNotFound {
continue
}
// 计划信息
plan, err = productPlanRepository.FindOne(map[string]interface{}{"companyId": cid, "orgId": oid, "batchNumber": v.BillNo})
if err == nil && plan != nil {
plan.UpdatedAt = time.Now()
continue
}
if err == domain.ErrorNotFound {
plan = &domain.ProductPlan{
CompanyId: cid,
OrgId: oid,
BatchNumber: v.BillNo,
ProductDate: v.PlanStartDate,
Workshop: workshop.CloneSample(),
WorkOn: 0,
Machine: "",
PlanStatus: domain.PlanOffline,
WorkStation: &domain.WorkStation{},
CreatedAt: time.Now(),
UpdatedAt: time.Now(),
}
}
// 更新数据
plan.PlanDevoted = &domain.UnitQuantity{
Unit: product.ProductSpec.Unit,
Quantity: 0,
Weight: product.ProductWeigh(0),
UnitWeight: product.ProductSpec.UnitWeight,
}
plan.Workshop = workshop.CloneSample()
plan.UpdatedAt = time.Now()
plan.Ext = domain.NewExt(org.OrgName).WithProductPlanExt(&domain.ProductPlanExt{
ProductId: product.ProductId,
ProductCode: product.ProductCode,
ProductName: product.ProductName,
DevotedUnit: "份",
})
plan.PlanProductName = product.ProductName
plan.Remark = v.Description
if plan, err = productPlanRepository.Save(plan); err != nil {
return err
}
if err = transactionContext.CommitTransaction(); err != nil {
return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
return nil
}
... ...
... ... @@ -44,6 +44,19 @@ func (m Workshops) FindWorkshopsByName(workshopName string) []int {
return result
}
func (m Workshops) ExistsWorkshops(workshopName string) (*Workshop, bool) {
if len(workshopName) == 0 {
return nil, false
}
for i := range m {
item := m[i]
if strings.EqualFold(item.WorkshopName, workshopName) {
return item, true
}
}
return nil, false
}
func (m Workshops) FindProductLinesByName(lineName string) []int {
result := make([]int, 0)
if len(lineName) == 0 {
... ...
... ... @@ -4,6 +4,7 @@ import (
"fmt"
"strconv"
"strings"
"time"
pgTransaction "github.com/linmadan/egglib-go/transaction/pg"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/pg/models"
... ... @@ -156,3 +157,16 @@ func (d *PrdMoK3cloudDao) SyncDataProductPlan(version int64, companyId, orgId in
_, err := d.transactionContext.PgTx.Exec(sql, companyId, orgId, version)
return err
}
// 增量获取数据
func (d *PrdMoK3cloudDao) GetLatestData(fromTime time.Time) ([]*models.PrdMoK3cloud, error) {
m := new(models.PrdMoK3cloud)
result := make([]*models.PrdMoK3cloud, 0)
query := d.transactionContext.PgTx.Model(m)
query.Where("modify_date>=?", fromTime)
err := query.Select(&result)
if err != nil {
return nil, err
}
return result, nil
}
... ...
package redis
import (
"fmt"
"github.com/go-redis/redis"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
"time"
)
// 获取每日设备运行数据
func GetLastK3CloudFetchTime() (time.Time, error) {
client := GetRedis()
key := K3CloudFetchTimeKey()
result := client.Get(key)
t, err := result.Int()
if err == redis.Nil {
if _, err := client.Set(key, time.Now().Unix(), 0).Result(); err != nil {
return time.Time{}, err
}
}
fetchTime := time.Unix(int64(t), 0)
if fetchTime.IsZero() {
return time.Time{}, fmt.Errorf("zerotime retry")
}
if _, err := client.Set(key, time.Now().Unix(), 0).Result(); err != nil {
return time.Time{}, err
}
return fetchTime, nil
}
func K3CloudFetchTimeKey() string {
str := fmt.Sprintf("%v:k3clound-data:last-fetch:%v-%v", constant.CACHE_PREFIX, constant.MANUFACTURE_DEFAULT_COMPANYID, constant.MANUFACTURE_DEFAULT_ORGID)
return str
}
... ...