goqueue.go 1.1 KB
package goqueue

import (
	"fmt"
	"github.com/tal-tech/go-queue/kq"
	"github.com/tal-tech/go-zero/core/logx"
	"github.com/tal-tech/go-zero/core/service"
	"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/constant"
	"gitlab.fjmaimaimai.com/allied-creation/allied-creation-user/pkg/log"
	"strings"
)

func SetUp() {
	go func() {
		q := kq.MustNewQueue(NewConfig(constant.TOPIC_UP_BLOCK_CHAIN, constant.TOPIC_UP_BLOCK_CHAIN, 2), kq.WithHandle(FilterTopicHandler("up_block_chain", UpToChainHandler)))
		defer func() {
			q.Stop()
			log.Logger.Info(fmt.Sprintf("goqueue:%v stop!", constant.TOPIC_UP_BLOCK_CHAIN))
		}()
		q.Start()
	}()
	log.Logger.Info("goqueue start!")
}

func NewConfig(topic, group string, consumers int) kq.KqConf {
	brokers := strings.Split(constant.KAFKA_HOST, ",")
	return kq.KqConf{
		ServiceConf: service.ServiceConf{
			Name: topic,
			Log: logx.LogConf{
				Mode: "console",
			},
			Mode: "pro",
		},
		Brokers:    brokers,
		Group:      group,
		Topic:      topic,
		Offset:     "first",
		Conns:      1,
		Consumers:  consumers,
		Processors: 4,
		MinBytes:   10200,
		MaxBytes:   10485760,
	}
}