package consumer import ( "github.com/Shopify/sarama" saramaConsumer "github.com/linmadan/egglib-go/mom/kafka/sarama" "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/constant" "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/log" "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/port/consumer/handle" "strings" ) func Run() { messageHandlerMap := make(map[string]func(message *sarama.ConsumerMessage) error) //messageHandlerMap["demo-v1"] = Demo //"指定topic" => 对应的处理方法 messageHandlerMap[constant.KAFKA_BUSINESS_TOPIC] = handle.SyncDataBusinessAdmin hosts := strings.Split(constant.KAFKA_HOSTS, ",") var host string if len(hosts) > 0 { host = hosts[0] } log.Logger.Debug("kafka host: " + host + " topic:" + constant.KAFKA_BUSINESS_TOPIC + " group id:" + constant.KAFKA_GROUP_ID) err := saramaConsumer.StartConsume(host, constant.KAFKA_GROUP_ID, messageHandlerMap, log.Logger) log.Logger.Error(err.Error()) }