作者 唐旭辉

.

@@ -115,12 +115,12 @@ func (r *Runer) InitConsumer() error { @@ -115,12 +115,12 @@ func (r *Runer) InitConsumer() error {
115 clusterCfg.Consumer.Offsets.Initial = sarama.OffsetOldest 115 clusterCfg.Consumer.Offsets.Initial = sarama.OffsetOldest
116 clusterCfg.Group.Return.Notifications = true 116 clusterCfg.Group.Return.Notifications = true
117 clusterCfg.Version = sarama.V0_10_2_1 117 clusterCfg.Version = sarama.V0_10_2_1
118 - khosts := []string{"192.168.0.252:9092", "192.168.0.251:9092", "192.168.0.250:9092"}  
119 - groupid := "partnermg_dev"  
120 - topic := []string{"topic_test"} 118 + // khosts := []string{"192.168.0.252:9092", "192.168.0.251:9092", "192.168.0.250:9092"}
  119 + // groupid := "partnermg_dev"
  120 + // topic := []string{"topic_test"}
121 logs.Debug(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, r.msgConsumer.topics) 121 logs.Debug(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, r.msgConsumer.topics)
122 - // consumer, err := cluster.NewConsumer(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, r.msgConsumer.topics, clusterCfg)  
123 - consumer, err := cluster.NewConsumer(khosts, groupid, topic, clusterCfg) 122 + consumer, err := cluster.NewConsumer(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, r.msgConsumer.topics, clusterCfg)
  123 + // consumer, err := cluster.NewConsumer(khosts, groupid, topic, clusterCfg)
124 if err != nil { 124 if err != nil {
125 msg := fmt.Sprintf("Create kafka consumer error: %v. config: %v", err, clusterCfg) 125 msg := fmt.Sprintf("Create kafka consumer error: %v. config: %v", err, clusterCfg)
126 logs.Error(msg) 126 logs.Error(msg)