作者 tangxuhui

是定时拉取合并金蝶k3cloud 物料数据

... ... @@ -2,12 +2,14 @@ package main
import (
"fmt"
"github.com/beego/beego/v2/server/web"
"github.com/linmadan/egglib-go/log/logrus"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/redis"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/log"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/application/crontab"
_ "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
_ "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/pg"
_ "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/redis"
... ... @@ -30,7 +32,11 @@ func main() {
})
log.Logger.AddHook(bw)
redis.InitRedis()
//设置定时任务
cron := crontab.NewCrontabService(nil)
cron.StartCrontabTask()
defer cron.StopCrontabTask()
//
log.Logger.Info("server start!")
web.Run()
}
... ...
package crontab
import (
"context"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/log"
"github.com/beego/beego/v2/task"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/application/syncdata"
)
type CrontabService struct {
}
func NewCrontabService(options map[string]interface{}) *CrontabService {
newCrontabService := &CrontabService{}
return newCrontabService
}
func (crontabService *CrontabService) initTask() {
//PullDataK3Cloud 晚上0时10分执行
pullMaterialK3cloud := task.NewTask("pullMaterialK3cloud", "0 10 0 * * *", func(ctx context.Context) error {
srv := syncdata.PullDataK3CloudService{}
return srv.PullMaterialNewest()
})
task.AddTask("pullMaterialK3cloud", pullMaterialK3cloud)
}
func (crontabService *CrontabService) StartCrontabTask() {
crontabService.initTask()
task.StartTask()
log.Logger.Info("crontab start!")
}
func (crontabService *CrontabService) StopCrontabTask() {
task.StopTask()
}
... ...
package syncdata
import (
"strconv"
"strings"
"time"
"github.com/linmadan/egglib-go/core/application"
pgTransaction "github.com/linmadan/egglib-go/transaction/pg"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/application/factory"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/dao"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/pg/models"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/utils/k3cloud"
)
//拉取金蝶k3cloud的数据,并更新本地数据库
type PullDataK3CloudService struct {
}
func newK3cloudClient() (*k3cloud.Client, error) {
// TODO 使用配置方式传入
var (
acctID = "20211118121754866"
username = "18559023318"
password = "stx@123456"
hostUrl = "https://tianlian.test.ik3cloud.com/k3cloud"
)
client, err := k3cloud.NewClient(hostUrl, acctID, username, password)
return client, err
}
func (srv *PullDataK3CloudService) PullMaterialNewest() error {
var (
err error
materialDao *dao.MaterialK3cloudDao
)
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
if err := transactionContext.StartTransaction(); err != nil {
return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
defer func() {
transactionContext.RollbackTransaction()
}()
materialDao, err = dao.NewMaterialK3cloudDao(transactionContext.(*pgTransaction.TransactionContext))
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
version, err := materialDao.GetLastVersion()
if err != nil {
return application.ThrowError(application.BUSINESS_ERROR, err.Error())
}
if err = transactionContext.CommitTransaction(); err != nil {
return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
var timeFilter time.Time
if version > 0 {
timeFilter = time.Unix(version, 0)
}
err = srv.PullMaterial(timeFilter)
if err != nil {
return err
}
return err
}
//PullMaterial 拉取物料数据
func (srv *PullDataK3CloudService) PullMaterial(timeFilter time.Time) error {
//拉取数据
var filterString []string
if !timeFilter.IsZero() {
str := timeFilter.Format("2006-01-02T15:04:05")
filterString = append(filterString, "FModifyDate>='"+str+"'")
}
client, err := newK3cloudClient()
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
fieldKeys := []string{
"FMATERIALID", "FSpecification", "FName", "FNumber",
"FDocumentStatus", "FForbidStatus", "FErpClsID",
"FBaseUnitId", "FBaseUnitId.FName", "FCreateDate", "FModifyDate",
"FForbidDate", "FApproveDate", "FMaterialGroup", "FMaterialGroup.FName",
"FRefStatus", "FMaterialGroup.FNumber", "FUseOrgId",
}
var (
startRow int
allResult []map[string]string
queryErr error
)
for {
result, err := client.ExecuteBillQuery(k3cloud.RequestExecuteBillQuery{
FormId: "BD_MATERIAL",
Data: k3cloud.ExecuteBillQueryData{
FormId: "BD_MATERIAL",
FieldKeys: strings.Join(fieldKeys, ","), //查询的字段
StartRow: startRow,
Limit: 1000,
FilterString: strings.Join(filterString, " and "),
},
})
if err != nil {
queryErr = err
break
}
mp := result.ToMapString()
if len(mp) == 0 {
break
}
allResult = append(allResult, mp...)
startRow += 1000
}
if queryErr != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
var (
materialModels []models.MaterialK3cloud
materialTemp models.MaterialK3cloud
)
nowTime := time.Now()
for _, item := range allResult {
materialId, err := strconv.Atoi(item["FMATERIALID"])
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
fErpClsID, _ := strconv.Atoi(item["FErpClsID"])
fBaseUnitId, _ := strconv.Atoi(item["FBaseUnitId"])
materialGroup, _ := strconv.Atoi(item["MaterialGroup"])
fUseOrgId, _ := strconv.Atoi(item["FUseOrgId"])
refStatus, _ := strconv.Atoi(item["RefStatus"])
fCreateDate, _ := time.Parse("2006-01-02T15:04:05.999", item["FCreateDate"])
fModifyDate, _ := time.Parse("2006-01-02T15:04:05.999", item["FModifyDate"])
fForbidDate, _ := time.Parse("2006-01-02T15:04:05.999", item["FForbidDate"])
fApproveDate, _ := time.Parse("2006-01-02T15:04:05.999", item["FApproveDate"])
materialTemp = models.MaterialK3cloud{
MaterialId: materialId,
Name: item["FName"],
Number: item["FNumber"],
Specification: item["FSpecification"],
ForbidStatus: item["FForbidStatus"],
ErpClsId: fErpClsID,
BaseUnitId: fBaseUnitId,
BaseUnitName: item["FBaseUnitId.FName"],
CreateDate: fCreateDate,
ModifyDate: fModifyDate,
ForbidDate: fForbidDate,
ApproveDate: fApproveDate,
MaterialGroup: materialGroup,
MaterialGroupNumber: item["FMaterialGroup.FNumber"],
MaterialGroupName: item["FMaterialGroup.FName"],
RefStatus: refStatus,
UseOrgId: fUseOrgId,
JoinProductId: 0,
DataVersion: nowTime.Unix(),
}
materialModels = append(materialModels, materialTemp)
}
var (
materialDao *dao.MaterialK3cloudDao
)
transactionContext, err := factory.CreateTransactionContext(nil)
if err != nil {
return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
if err := transactionContext.StartTransaction(); err != nil {
return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
defer func() {
transactionContext.RollbackTransaction()
}()
materialDao, err = dao.NewMaterialK3cloudDao(transactionContext.(*pgTransaction.TransactionContext))
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
//同步MaterialK3cloud表数据
err = materialDao.SyncDataMaterialK3cloud(materialModels)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
//MaterialK3cloud表数据到Proudct表
err = materialDao.SyncDataProudct(nowTime.Unix())
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
if err = transactionContext.CommitTransaction(); err != nil {
return application.ThrowError(application.TRANSACTION_ERROR, err.Error())
}
return nil
}
... ...
/postgresql_local.go
... ...
//go:build !local
package constant
import (
... ...
package dao
import (
"fmt"
"strconv"
"strings"
pgTransaction "github.com/linmadan/egglib-go/transaction/pg"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/pg/models"
)
... ... @@ -9,6 +13,16 @@ type MaterialK3cloudDao struct {
transactionContext *pgTransaction.TransactionContext
}
func NewMaterialK3cloudDao(transactionContext *pgTransaction.TransactionContext) (*MaterialK3cloudDao, error) {
if transactionContext == nil {
return nil, fmt.Errorf("transactionContext参数不能为nil")
} else {
return &MaterialK3cloudDao{
transactionContext: transactionContext,
}, nil
}
}
//SyncDataMaterialK3cloud 同步MaterialK3cloud表数据
func (d *MaterialK3cloudDao) SyncDataMaterialK3cloud(data []models.MaterialK3cloud) error {
// -- 插入或者更新
... ... @@ -35,25 +49,118 @@ func (d *MaterialK3cloudDao) SyncDataMaterialK3cloud(data []models.MaterialK3clo
// EXCLUDED."forbid_date",EXCLUDED."approve_date",EXCLUDED."material_group",
// EXCLUDED."material_group_number",EXCLUDED."material_group_name",
// EXCLUDED."ref_status ",EXCLUDED."data_version" )
sqlValues := []string{}
var strTemp []string
for i := range data {
strTemp = make([]string, 0, 18)
strTemp = append(strTemp, strconv.Itoa(data[i].MaterialId))
strTemp = append(strTemp, `'`+data[i].Name+`'`)
strTemp = append(strTemp, `'`+data[i].Number+`'`)
strTemp = append(strTemp, `'`+data[i].Specification+`'`)
strTemp = append(strTemp, `'`+data[i].ForbidStatus+`'`)
strTemp = append(strTemp, strconv.Itoa(data[i].ErpClsId))
strTemp = append(strTemp, strconv.Itoa(data[i].BaseUnitId))
strTemp = append(strTemp, `'`+data[i].BaseUnitName+`'`)
if data[i].CreateDate.IsZero() {
strTemp = append(strTemp, `NULL`)
} else {
strTemp = append(strTemp, `'`+data[i].CreateDate.String()+`'`)
}
if data[i].ModifyDate.IsZero() {
strTemp = append(strTemp, `NULL`)
} else {
strTemp = append(strTemp, `'`+data[i].ModifyDate.String()+`'`)
}
if data[i].ForbidDate.IsZero() {
strTemp = append(strTemp, `NULL`)
} else {
strTemp = append(strTemp, `'`+data[i].ForbidDate.String()+`'`)
}
if data[i].ApproveDate.IsZero() {
strTemp = append(strTemp, `NULL`)
} else {
strTemp = append(strTemp, `'`+data[i].ApproveDate.String()+`'`)
}
strTemp = append(strTemp, strconv.Itoa(data[i].MaterialGroup))
strTemp = append(strTemp, `'`+data[i].MaterialGroupNumber+`'`)
strTemp = append(strTemp, `'`+data[i].MaterialGroupName+`'`)
strTemp = append(strTemp, strconv.Itoa(data[i].RefStatus))
//关联的产品表id ,使用 product 产品表的自增序列表
strTemp = append(strTemp, "nextval('manufacture.product_product_id_seq'::regclass)")
strTemp = append(strTemp, strconv.Itoa(int(data[i].DataVersion)))
strTemp = append(strTemp, strconv.Itoa(data[i].UseOrgId))
sqlValues = append(sqlValues, "("+strings.Join(strTemp, ",")+")")
}
var valueTemp []string
for i := 0; i < len(sqlValues); i += 100 {
if i < len(sqlValues)-100 {
valueTemp = sqlValues[i:100]
} else {
valueTemp = sqlValues[i:]
}
sql := `INSERT INTO "manufacture"."material_k3cloud" (
"material_id","name","number","specification","forbid_status",
"erp_cls_id","base_unit_id","base_unit_name","create_date",
"modify_date","forbid_date","approve_date","material_group",
"material_group_number","material_group_name","ref_status ",
"join_product_id","data_version","use_org_id" )
VALUES ` + strings.Join(valueTemp, ",") +
` ON conflict ( material_id ) DO
UPDATE
SET (
"name","number","specification","forbid_status","erp_cls_id",
"base_unit_id","base_unit_name","create_date","modify_date",
"forbid_date","approve_date","material_group","material_group_number",
"material_group_name","ref_status ","data_version" ) = (
EXCLUDED."name",EXCLUDED."number",EXCLUDED."specification",
EXCLUDED."forbid_status",EXCLUDED."erp_cls_id",EXCLUDED."base_unit_id",
EXCLUDED."base_unit_name",EXCLUDED."create_date",EXCLUDED."modify_date",
EXCLUDED."forbid_date",EXCLUDED."approve_date",EXCLUDED."material_group",
EXCLUDED."material_group_number",EXCLUDED."material_group_name",
EXCLUDED."ref_status ",EXCLUDED."data_version" )`
_, err := d.transactionContext.PgTx.Exec(sql)
if err != nil {
return err
}
}
return nil
}
//SyncDataProudct 同步Proudct表数据
func (d *MaterialK3cloudDao) SyncDataProudct(version int) error {
//SyncDataProudct 同步MaterialK3cloud表数据到Proudct表
func (d *MaterialK3cloudDao) SyncDataProudct(version int64) error {
// -- 插入或者更新
// INSERT INTO "manufacture"."product"(
// "company_id", "org_id", "product_id", "product_code", "product_name",
// "product_category", "product_spec", "created_at", "updated_at"
// )
// SELECT 0,0,"join_product_id","number","name","material_group_name",'{}',now(),now()
// FROM "manufacture"."material_k3cloud" WHERE "data_version"=0000
// ON conflict ( product_id ) DO
// UPDATE
// SET (
// "company_id", "org_id", "product_id", "product_code", "product_name",
// "product_category", "product_spec", "created_at", "updated_at")=(
// EXCLUDED."company_id", EXCLUDED."org_id",EXCLUDED."product_id",
// EXCLUDED."product_code",EXCLUDED."product_name",EXCLUDED."product_category",
// EXCLUDED."product_spec", EXCLUDED."created_at", EXCLUDED."updated_at")
return nil
sql := `INSERT INTO "manufacture"."product"(
"company_id", "org_id", "product_id", "product_code", "product_name",
"product_category", "product_spec", "created_at", "updated_at"
)
SELECT 0,use_org_id,"join_product_id","number","name","material_group_name",
json_build_object('unit',specification),now(),now()
FROM "manufacture"."material_k3cloud" WHERE "data_version"=?
ON conflict ( product_id ) DO
UPDATE
SET (
"company_id", "org_id", "product_id", "product_code", "product_name",
"product_category", "product_spec", "created_at", "updated_at")=(
EXCLUDED."company_id", EXCLUDED."org_id",EXCLUDED."product_id",
EXCLUDED."product_code",EXCLUDED."product_name",EXCLUDED."product_category",
EXCLUDED."product_spec", EXCLUDED."updated_at") `
_, err := d.transactionContext.PgTx.Exec(sql, version)
return err
}
func (d *MaterialK3cloudDao) GetLastVersion() (int64, error) {
var materialData []models.MaterialK3cloud
err := d.transactionContext.PgTx.Model(&materialData).
Order("data_version DESC").
Limit(1).
Select()
if err != nil {
return 0, err
}
if len(materialData) == 0 {
return 0, nil
}
return materialData[0].DataVersion, nil
}
... ...
... ... @@ -8,7 +8,7 @@ type DataLogK3cloud struct {
FormId string `comment:"formId" pg:"form_id"`
RequestParam string `comment:"请求参数" pg:"request_param"`
BeginAt int64 `comment:"开始时间" pg:"begin_at"`
EndAt int64 `comment:"结束时间" pg:"begin_at"`
EndAt int64 `comment:"结束时间" pg:"end_at"`
DataVersion int64 `comment:"数据版本" pg:"data_version"`
IsSuccess int `comment:"是否成功" pg:"is_success"`
Error string `comment:"错误信息" pg:"error"`
... ...
package models
import "time"
//MaterialK3cloud 采集自金蝶k3cloud物料数据
type MaterialK3cloud struct {
tableName string `comment:"采集自金蝶k3cloud物料数据" pg:"manufacture.material_k3cloud,alias:material_k3cloud"`
MaterialId int64 `pg:",pk"`
Name string `comment:"物料名称" pg:"name"`
Number string `comment:"物料编码" pg:"number"`
Specification string `comment:"规格型号" pg:"specification"`
ForbidStatus string `comment:"禁用状态,"A":否,"B":是" pg:"forbid_status"`
ErpClsId int `comment:"物料属性" pg:"erp_cls_id"`
BaseUnitId int `comment:"基本单位" pg:"base_unit_id"`
BaseUnitName string `comment:"基本单位名称" pg:"base_unit_name"`
CreateDate string `comment:"创建时间" pg:"create_date"`
ModifyDate string `comment:"修改时间" pg:"modify_date"`
ForbidDate string `comment:"禁用时间" pg:"forbid_date"`
ApproveDate string `comment:"审核时间" pg:"approve_date"`
MaterialGroup int `comment:"物料分组" pg:"material_group"`
MaterialGroupNumber string `comment:"物料分组编码" pg:"material_group_number"`
MaterialGroupName string `comment:"物料分组名称" pg:"material_group_name"`
RefStatus int `comment:"是否使用" pg:"ref_status"`
JoinProductId int64 `comment:"关联的product表id" pg:"join_product_id"`
DataVersion int64 `comment:"数据版本" pg:"data_version"`
tableName string `comment:"采集自金蝶k3cloud物料数据" pg:"manufacture.material_k3cloud,alias:material_k3cloud"`
MaterialId int `pg:",pk"`
Name string `comment:"物料名称" pg:"name"`
Number string `comment:"物料编码" pg:"number"`
Specification string `comment:"规格型号" pg:"specification"`
ForbidStatus string `comment:"禁用状态,"A":否,"B":是" pg:"forbid_status"`
ErpClsId int `comment:"物料属性" pg:"erp_cls_id"`
BaseUnitId int `comment:"基本单位" pg:"base_unit_id"`
BaseUnitName string `comment:"基本单位名称" pg:"base_unit_name"`
CreateDate time.Time `comment:"创建时间" pg:"create_date,type:timestamp"`
ModifyDate time.Time `comment:"修改时间" pg:"modify_date,type:timestamp"`
ForbidDate time.Time `comment:"禁用时间" pg:"forbid_date,type:timestamp"`
ApproveDate time.Time `comment:"审核时间" pg:"approve_date,type:timestamp"`
MaterialGroup int `comment:"物料分组" pg:"material_group"`
MaterialGroupNumber string `comment:"物料分组编码" pg:"material_group_number"`
MaterialGroupName string `comment:"物料分组名称" pg:"material_group_name"`
RefStatus int `comment:"是否使用" pg:"ref_status"`
UseOrgId int `comment:"使用组织" pg:"use_org_id"`
JoinProductId int64 `comment:"关联的product表id" pg:"join_product_id"`
DataVersion int64 `comment:"数据版本" pg:"data_version"`
}
//批量处理sql脚本样例
... ...
... ... @@ -34,12 +34,13 @@ func TestExecuteBillQuery(t *testing.T) {
// {"FApproveDate", "审核日期"}, {"FOldNumber", "旧物料编码"}, {"FMaterialGroup", "物料分组"}, {"FPLMMaterialId", "PLM物料内码"}, {"FMaterialSRC", "物料来源"},
// {"FIsSalseByNet", "是否网销"}, {"FIsAutoAllocate", "自动分配"}, {"FSPUID", "SPU信息"}, {"FPinYin", "拼音"}, {"FDSMatchByLot", "按批号匹配供需"},
// {"FForbidReson", "禁用原因"}, {"FRefStatus", "已使用"}}
//FMATERIALID,FSpecification,FName,FNumber,FModifyDate,FBaseUnitId.FName,FUseOrgId.FName
result, err := client.ExecuteBillQuery(RequestExecuteBillQuery{
FormId: "BD_MATERIAL",
Data: ExecuteBillQueryData{
FormId: "BD_MATERIAL",
FieldKeys: "FMATERIALID,FSpecification,FName,FNumber,FModifyDate,FBaseUnitId.FName,FUseOrgId.FName", //查询的字段
TopRowCount: 5,
FieldKeys: "FMATERIALID,FSpecification,FName,FNumber,FModifyDate,FBaseUnitId.FName,FUseOrgId,FUseOrgId.FName", //查询的字段
TopRowCount: 0,
FilterString: `FMaterialGroup.FNumber like '05%' and FModifyDate<'2022-01-08T19:36:06'`,
},
})
... ...