作者 唐旭辉

..

@@ -115,12 +115,12 @@ func (r *Runer) InitConsumer() error { @@ -115,12 +115,12 @@ func (r *Runer) InitConsumer() error {
115 clusterCfg.Consumer.Offsets.Initial = sarama.OffsetOldest 115 clusterCfg.Consumer.Offsets.Initial = sarama.OffsetOldest
116 clusterCfg.Group.Return.Notifications = true 116 clusterCfg.Group.Return.Notifications = true
117 clusterCfg.Version = sarama.V0_10_2_1 117 clusterCfg.Version = sarama.V0_10_2_1
118 - // khosts := []string{"192.168.0.252:9092", "192.168.0.251:9092", "192.168.0.250:9092"}  
119 - // groupid := "partnermg_dev"  
120 - // topic := []string{"topic_test"} 118 + khosts := []string{"192.168.0.252:9092", "192.168.0.251:9092", "192.168.0.250:9092"}
  119 + groupid := "partnermg_dev"
  120 + topic := []string{"topic_test"}
121 logs.Debug(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, r.msgConsumer.topics) 121 logs.Debug(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, r.msgConsumer.topics)
122 - consumer, err := cluster.NewConsumer(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, r.msgConsumer.topics, clusterCfg)  
123 - // consumer, err := cluster.NewConsumer(khosts, groupid, topic, clusterCfg) 122 + // consumer, err := cluster.NewConsumer(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, r.msgConsumer.topics, clusterCfg)
  123 + consumer, err := cluster.NewConsumer(khosts, groupid, topic, clusterCfg)
124 if err != nil { 124 if err != nil {
125 msg := fmt.Sprintf("Create kafka consumer error: %v. config: %v", err, clusterCfg) 125 msg := fmt.Sprintf("Create kafka consumer error: %v. config: %v", err, clusterCfg)
126 logs.Error(msg) 126 logs.Error(msg)