asynq_run.go 1.6 KB
package task

import (
	"fmt"
	"github.com/hibiken/asynq"
	"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/constant"
	"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/domain"
	"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/infrastructure/domainService"
	"gitlab.fjmaimaimai.com/allied-creation/allied-creation-manufacture/pkg/log"
	"os"
	"os/signal"
	"syscall"
)

func Run() {
	defer func() {
		if err := recover(); err != nil {
			fmt.Println(err)
		}
	}()
	srv := asynq.NewServer(
		asynq.RedisClientOpt{Addr: constant.REDIS_ADDRESS},
		asynq.Config{
			//Concurrency: 4,
			Queues: map[string]int{
				//"critical": 1,
				"default": 1,
				domainService.FormatQueue(domainService.QueueDevice):  1,
				domainService.FormatQueue(domainService.QueueProduct): 1,
				domainService.FormatQueue(domainService.QueueDefault): 1,
			},
			StrictPriority: true,
		},
	)

	h := asynq.NewServeMux()
	// ... Register handlers
	h.HandleFunc(domain.TaskKeyPatternProductRecordStatics(), HandlerProductRecordStatics)
	h.HandleFunc(domain.TaskKeyWorkshopWorkTimeRecordStatics(), WorkshopWorkTimeRecordStatics)
	h.HandleFunc(domain.TaskDeviceZkTecoReport(), WorkerAttendanceReport)
	h.HandleFunc(domain.TaskDeviceCollection(), WorkshopDataConsumer)
	log.Logger.Info("aysnq task start!")
	// Run blocks and waits for os signal to terminate the program.
	if err := srv.Run(h); err != nil {
		log.Logger.Error(err.Error())
	}
	sigs := make(chan os.Signal, 1)
	signal.Notify(sigs, syscall.SIGTERM, syscall.SIGINT)
	<-sigs // wait for termination signal
	log.Logger.Info("aysnq task stopping ...")
	srv.Shutdown()
}