asynq_run.go 1.9 KB
package task

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/hibiken/asynq"
	"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/application/productRecord/command"
	"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/application/productRecord/service"
	"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
	"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/domain"
	"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/log"
	"os"
	"os/signal"
	"syscall"
)

func Run() {
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: constant.REDIS_ADDRESS},
		asynq.Config{Concurrency: 1},
	)

	h := asynq.NewServeMux()
	// ... Register handlers
	h.HandleFunc(domain.TaskKeyPatternProductRecordStatics(), HandlerProductRecordStatics)
	h.HandleFunc(domain.TaskKeyWorkshopWorkTimeRecordStatics(), WorkshopWorkTimeRecordStatics)
	log.Logger.Info("aysnq task running ...")
	// Run blocks and waits for os signal to terminate the program.
	if err := srv.Run(h); err != nil {
		log.Logger.Error(err.Error())
	}
	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
	<-sigs // wait for termination signal
	log.Logger.Info("aysnq task stopping ...")
	srv.Shutdown()
}

// 生产记录统计
func HandlerProductRecordStatics(c context.Context, t *asynq.Task) error {
	productPlanService := service.NewProductRecordService(nil)
	cmd := &command.ProductRecordStaticsCommand{}
	if err := json.Unmarshal(t.Payload(), cmd); err != nil {
		return err
	}
	log.Logger.Debug(fmt.Sprintf("【生产记录统计】 消费 生产记录ID:%v 类型:%v 工段:%v(%v) 重量:%v", cmd.ProductRecordId, cmd.ProductRecordType,
		cmd.WorkStation.SectionName, cmd.WorkStation.WorkStationId, cmd.ProductRecordInfo.Weigh))
	_, err := productPlanService.ProductRecordStatics(cmd)
	if err != nil {
		log.Logger.Error(err.Error())
	}
	return err
}