作者 庄敏学

kafka

@@ -6,6 +6,7 @@ import ( @@ -6,6 +6,7 @@ import (
6 "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/constant" 6 "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/constant"
7 "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/log" 7 "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/log"
8 "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/port/consumer/handle" 8 "gitlab.fjmaimaimai.com/allied-creation/performance/pkg/port/consumer/handle"
  9 + "strings"
9 ) 10 )
10 11
11 func Run() { 12 func Run() {
@@ -13,8 +14,13 @@ func Run() { @@ -13,8 +14,13 @@ func Run() {
13 messageHandlerMap["demo-v1"] = Demo 14 messageHandlerMap["demo-v1"] = Demo
14 //"指定topic" => 对应的处理方法 15 //"指定topic" => 对应的处理方法
15 messageHandlerMap[constant.KAFKA_BUSINESS_TOPIC] = handle.SyncDataBusinessAdmin 16 messageHandlerMap[constant.KAFKA_BUSINESS_TOPIC] = handle.SyncDataBusinessAdmin
16 - log.Logger.Debug("kafka host: " + constant.KAFKA_HOSTS + " group id:" + constant.KAFKA_GROUP_ID)  
17 - err := saramaConsumer.StartConsume(constant.KAFKA_HOSTS, constant.KAFKA_GROUP_ID, messageHandlerMap, log.Logger) 17 + hosts := strings.Split(constant.KAFKA_HOSTS, ",")
  18 + var host string
  19 + if len(hosts) > 0 {
  20 + host = hosts[0]
  21 + }
  22 + log.Logger.Debug("kafka host: " + host + " topic:" + constant.KAFKA_BUSINESS_TOPIC + " group id:" + constant.KAFKA_GROUP_ID)
  23 + err := saramaConsumer.StartConsume(host, constant.KAFKA_GROUP_ID, messageHandlerMap, log.Logger)
18 log.Logger.Error(err.Error()) 24 log.Logger.Error(err.Error())
19 } 25 }
20 26