作者 yangfu

refactor:1.代码优化

... ... @@ -2,14 +2,14 @@ package main
import (
"fmt"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/port/mqtt"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/port/task"
"github.com/beego/beego/v2/server/web"
"github.com/linmadan/egglib-go/log/logrus"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/redis"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/log"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/port/mqtt"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/port/task"
"time"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/application/crontab"
_ "gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
... ... @@ -44,6 +44,7 @@ func main() {
cron := crontab.NewCrontabService(nil)
cron.StartCrontabTask()
defer cron.StopCrontabTask()
time.Sleep(time.Second)
log.Logger.Info("server start!")
web.Run()
log.Logger.Info("server stop!")
... ...
... ... @@ -436,8 +436,15 @@ func (srv *PullDataK3CloudService) PullPrdMo(timeFilter time.Time) error {
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
var userService = domainService.NewUserService()
org, err := userService.Organization(constant.MANUFACTURE_DEFAULT_ORGID)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
//同步ProductPlan 表数据
err = prdMoDao.SyncDataProductPlan(version)
err = prdMoDao.SyncDataProductPlan(version, constant.MANUFACTURE_DEFAULT_COMPANYID, constant.MANUFACTURE_DEFAULT_ORGID, org.OrgName)
if err != nil {
return application.ThrowError(application.INTERNAL_SERVER_ERROR, err.Error())
}
... ...
... ... @@ -115,24 +115,26 @@ func (d *PrdMoK3cloudDao) GetLastVersion() (int64, error) {
}
//SyncDataProductPlan 同步数据到
func (d *PrdMoK3cloudDao) SyncDataProductPlan(version int64) error {
sql := `
func (d *PrdMoK3cloudDao) SyncDataProductPlan(version int64, companyId, orgId int, orgName string) error {
sql := fmt.Sprintf(`
INSERT INTO "manufacture"."product_plan"(
"company_id","org_id",
"product_plan_id","batch_number","product_date","workshop",
"plan_product_name","plan_devoted","plan_status","remark",
"ext","created_at","updated_at"
)
SELECT prd_mo_k3cloud."join_product_plan_id",prd_mo_k3cloud."bill_no",
SELECT ?,?,
prd_mo_k3cloud."join_product_plan_id",prd_mo_k3cloud."bill_no",
prd_mo_k3cloud."plan_start_date",
json_build_object('workshopId',COALESCE("workshop"."workshop_id",0),'workshopName',prd_mo_k3cloud."work_shop_name"),
prd_mo_k3cloud."material_name", json_build_object('unit',prd_mo_k3cloud."unit_name",'quantity',prd_mo_k3cloud."qty"),
2,prd_mo_k3cloud."description",
json_build_object('productPlanExt',json_build_object('productId',prd_mo_k3cloud."join_product_plan_id",'productCode',prd_mo_k3cloud."material_number",'productName',prd_mo_k3cloud."material_name")),
json_build_object('orgName','%v','productPlanExt',json_build_object('productId',prd_mo_k3cloud."join_product_plan_id",'productCode',prd_mo_k3cloud."material_number",'productName',prd_mo_k3cloud."material_name")),
now(),now()
FROM "manufacture"."prd_mo_k3cloud"
LEFT JOIN "manufacture"."material_k3cloud" ON "prd_mo_k3cloud"."material_id"="material_k3cloud"."material_id"
LEFT JOIN "manufacture"."workshop" ON "workshop"."workshop_name" = "prd_mo_k3cloud"."work_shop_name"
WHERE prd_mo_k3cloud."data_version">=?
-- LEFT JOIN "manufacture"."material_k3cloud" ON "prd_mo_k3cloud"."material_id"="material_k3cloud"."material_id"
INNER JOIN "manufacture"."workshop" ON "workshop"."workshop_name" = "prd_mo_k3cloud"."work_shop_name"
WHERE prd_mo_k3cloud."data_version">=?
ON conflict ("product_plan_id") DO
UPDATE
SET (
... ... @@ -150,7 +152,7 @@ func (d *PrdMoK3cloudDao) SyncDataProductPlan(version int64) error {
"product_plan"."ext"||EXCLUDED."ext",
EXCLUDED."updated_at"
)
`
_, err := d.transactionContext.PgTx.Exec(sql, version)
`, orgName)
_, err := d.transactionContext.PgTx.Exec(sql, companyId, orgId, version)
return err
}
... ...
... ... @@ -31,8 +31,8 @@ func init() {
web.BConfig.Listen.HTTPSPort = 443
//进程内监控
web.BConfig.Listen.EnableAdmin = true
web.BConfig.Listen.AdminPort = 8088
//web.BConfig.Listen.EnableAdmin = true
//web.BConfig.Listen.AdminPort = 8088
if os.Getenv("HTTPS_PORT") != "" {
portStr := os.Getenv("HTTPS_PORT")
if port, err := strconv.Atoi(portStr); err == nil {
... ...
package task
import (
"fmt"
"github.com/hibiken/asynq"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/domain"
... ... @@ -11,6 +12,11 @@ import (
)
func Run() {
defer func() {
if err := recover(); err != nil {
fmt.Println(err)
}
}()
srv := asynq.NewServer(
asynq.RedisClientOpt{Addr: constant.REDIS_ADDRESS},
asynq.Config{Concurrency: 1},
... ... @@ -22,7 +28,7 @@ func Run() {
h.HandleFunc(domain.TaskKeyWorkshopWorkTimeRecordStatics(), WorkshopWorkTimeRecordStatics)
h.HandleFunc(domain.TaskDeviceZkTecoReport(), WorkerAttendanceReport)
h.HandleFunc(domain.TaskDeviceCollection(), WorkshopDataConsumer)
log.Logger.Info("aysnq task running ...")
log.Logger.Info("aysnq task start!")
// Run blocks and waits for os signal to terminate the program.
if err := srv.Run(h); err != nil {
log.Logger.Error(err.Error())
... ...