作者 唐旭辉

更新

... ... @@ -79,7 +79,7 @@ spec:
- name: BUSINESS_ADMIN_HOST
value: "http://suplus-business-admin-prd.fjmaimaimai.com"
- name: KAFKA_HOST
value: ""
value: "192.168.0.250:9092;192.168.0.251:9092;192.168.0.252:9092"
- name: KAFKA_CONSUMER_ID
value: "partnermg_prd"
volumes:
... ...
... ... @@ -76,7 +76,7 @@ spec:
- name: BUSINESS_ADMIN_HOST
value: "http://suplus-business-admin-test.fjmaimaimai.com"
- name: KAFKA_HOST
value: "106.52.15.41:9092"
value: "192.168.0.250:9092;192.168.0.251:9092;192.168.0.252:9092"
- name: KAFKA_CONSUMER_ID
value: "partnermg_test"
volumes:
... ...
... ... @@ -82,8 +82,6 @@ func NewRuner() *Runer {
groupId: configs.Cfg.ConsumerId,
topicsHandles: TopicHandleRouters,
topics: topics,
// beforeHandles: BeforeHandles,
// afterHandles: AfterHandles,
},
}
logs.Debug("kafka_host=%v; topic=%v;groupid=%s ", r.msgConsumer.kafkaHosts,
... ... @@ -94,7 +92,7 @@ func NewRuner() *Runer {
func (r *Runer) InitConsumer() error {
config := sarama.NewConfig()
//config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
config.Consumer.Offsets.Initial = sarama.OffsetOldest
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.Version = sarama.V0_10_2_1
if err := config.Validate(); err != nil {
msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", configs.Cfg, err)
... ...
package consumer
import (
"os"
"github.com/Shopify/sarama"
"github.com/astaxie/beego/logs"
"gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/handles"
)
//TopicHandle 处理kafka中得消息
type TopicHandle func(*sarama.ConsumerMessage) error
var BeforeHandles = []TopicHandle{}
var AfterHandles = []TopicHandle{}
//TopicHandleRouters 根据topic区分消息并进行处理
var TopicHandleRouters = map[string]TopicHandle{
"topic_test": func(message *sarama.ConsumerMessage) error {
... ... @@ -18,5 +18,25 @@ var TopicHandleRouters = map[string]TopicHandle{
message.Timestamp, message.Topic, message.Offset, string(message.Value))
return nil
},
//"xiangmi-orders": handles.DataFromXiangMi,
}
func init() {
var runEnv string
if os.Getenv("KAFKA_CONSUMER_ID") != "" {
runEnv = os.Getenv("KAFKA_CONSUMER_ID")
}
if runEnv == "partnermg_test" {
initHandleRoutersTest()
}
if runEnv == "partnermg_prd" {
initHandleRoutersProd()
}
}
func initHandleRoutersTest() {
TopicHandleRouters["xiangmi_project_test"] = handles.DataFromXiangMi
}
func initHandleRoutersProd() {
TopicHandleRouters["xiangmi_project"] = handles.DataFromXiangMi
}
... ...