作者 唐旭辉

更新配置

@@ -78,6 +78,8 @@ spec: @@ -78,6 +78,8 @@ spec:
78 value: "6DwjBO735" 78 value: "6DwjBO735"
79 - name: BUSINESS_ADMIN_HOST 79 - name: BUSINESS_ADMIN_HOST
80 value: "http://suplus-business-admin-dev.fjmaimaimai.com" 80 value: "http://suplus-business-admin-dev.fjmaimaimai.com"
  81 + - name: KAFKA_HOST
  82 + value: "106.52.15.41:9092"
81 volumes: 83 volumes:
82 - name: accesslogs 84 - name: accesslogs
83 emptyDir: {} 85 emptyDir: {}
@@ -75,6 +75,8 @@ spec: @@ -75,6 +75,8 @@ spec:
75 value: "rsF0pL!6DwjBO735" 75 value: "rsF0pL!6DwjBO735"
76 - name: BUSINESS_ADMIN_HOST 76 - name: BUSINESS_ADMIN_HOST
77 value: "http://suplus-business-admin-test.fjmaimaimai.com" 77 value: "http://suplus-business-admin-test.fjmaimaimai.com"
  78 + - name: KAFKA_HOST
  79 + value: "106.52.15.41:9092"
78 volumes: 80 volumes:
79 - name: accesslogs 81 - name: accesslogs
80 emptyDir: {} 82 emptyDir: {}
  1 +package constant
  2 +
  3 +import (
  4 + "os"
  5 + "strings"
  6 +)
  7 +
  8 +type KafkaConfig struct {
  9 + Servers []string `json:"servers"`
  10 + ConsumerId string `json:"consumerGroup"`
  11 +}
  12 +
  13 +var KafkaCfg KafkaConfig
  14 +
  15 +func init() {
  16 + KafkaCfg = KafkaConfig{
  17 + Servers: []string{"106.52.15.41:9092"},
  18 + ConsumerId: "partnermg",
  19 + }
  20 + if os.Getenv("KAFKA_HOST") != "" {
  21 + kafkaHost := os.Getenv("KAFKA_HOST")
  22 + KafkaCfg.Servers = strings.Split(kafkaHost, ";")
  23 + }
  24 +}
@@ -3,6 +3,8 @@ package configs @@ -3,6 +3,8 @@ package configs
3 import ( 3 import (
4 "os" 4 "os"
5 "strings" 5 "strings"
  6 +
  7 + "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/constant"
6 ) 8 )
7 9
8 type MqConfig struct { 10 type MqConfig struct {
@@ -10,7 +12,10 @@ type MqConfig struct { @@ -10,7 +12,10 @@ type MqConfig struct {
10 ConsumerId string `json:"consumerGroup"` 12 ConsumerId string `json:"consumerGroup"`
11 } 13 }
12 14
13 -var Cfg MqConfig 15 +var Cfg = MqConfig{
  16 + Servers: constant.KafkaCfg.Servers,
  17 + ConsumerId: constant.KafkaCfg.ConsumerId,
  18 +}
14 19
15 func init() { 20 func init() {
16 21
@@ -59,8 +59,9 @@ func (c *MessageConsumer) ConsumeClaim(groupSession sarama.ConsumerGroupSession, @@ -59,8 +59,9 @@ func (c *MessageConsumer) ConsumeClaim(groupSession sarama.ConsumerGroupSession,
59 } 59 }
60 if err = topicHandle(message); err != nil { 60 if err = topicHandle(message); err != nil {
61 logs.Error("Message claimed: kafka消息处理错误 topic =", message.Topic, message.Offset, err) 61 logs.Error("Message claimed: kafka消息处理错误 topic =", message.Topic, message.Offset, err)
  62 + } else {
  63 + groupSession.MarkMessage(message, "")
62 } 64 }
63 - groupSession.MarkMessage(message, "")  
64 } 65 }
65 return nil 66 return nil
66 } 67 }