审查视图

pkg/port/consumer/sarama.go 971 字节
tangxvhui authored
1 2 3 4 5 6 7
package consumer

import (
	"github.com/Shopify/sarama"
	saramaConsumer "github.com/linmadan/egglib-go/mom/kafka/sarama"
	"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/constant"
	"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/log"
tangxvhui authored
8
	"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/port/consumer/handle"
庄敏学 authored
9
	"strings"
tangxvhui authored
10 11 12 13
)

func Run() {
	messageHandlerMap := make(map[string]func(message *sarama.ConsumerMessage) error)
庄敏学 authored
14
	//messageHandlerMap["demo-v1"] = Demo
tangxvhui authored
15
	//"指定topic" => 对应的处理方法
庄敏学 authored
16
	messageHandlerMap[constant.KAFKA_BUSINESS_TOPIC] = handle.SyncDataBusinessAdmin
庄敏学 authored
17 18 19 20 21 22 23
	hosts := strings.Split(constant.KAFKA_HOSTS, ",")
	var host string
	if len(hosts) > 0 {
		host = hosts[0]
	}
	log.Logger.Debug("kafka host: " + host + "  topic:" + constant.KAFKA_BUSINESS_TOPIC + "   group id:" + constant.KAFKA_GROUP_ID)
	err := saramaConsumer.StartConsume(host, constant.KAFKA_GROUP_ID, messageHandlerMap, log.Logger)
tangxvhui authored
24
	log.Logger.Error(err.Error())
tangxvhui authored
25
}