sarama.go
971 字节
package consumer
import (
"github.com/Shopify/sarama"
saramaConsumer "github.com/linmadan/egglib-go/mom/kafka/sarama"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/constant"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/log"
"gitlab.fjmaimaimai.com/allied-creation/performance/pkg/port/consumer/handle"
"strings"
)
func Run() {
messageHandlerMap := make(map[string]func(message *sarama.ConsumerMessage) error)
//messageHandlerMap["demo-v1"] = Demo
//"指定topic" => 对应的处理方法
messageHandlerMap[constant.KAFKA_BUSINESS_TOPIC] = handle.SyncDataBusinessAdmin
hosts := strings.Split(constant.KAFKA_HOSTS, ",")
var host string
if len(hosts) > 0 {
host = hosts[0]
}
log.Logger.Debug("kafka host: " + host + " topic:" + constant.KAFKA_BUSINESS_TOPIC + " group id:" + constant.KAFKA_GROUP_ID)
err := saramaConsumer.StartConsume(host, constant.KAFKA_GROUP_ID, messageHandlerMap, log.Logger)
log.Logger.Error(err.Error())
}