作者 唐旭辉

,,

@@ -4,6 +4,7 @@ import ( @@ -4,6 +4,7 @@ import (
4 "context" 4 "context"
5 "errors" 5 "errors"
6 "fmt" 6 "fmt"
  7 + "time"
7 8
8 "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs" 9 "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs"
9 10
@@ -90,95 +91,95 @@ func NewRuner() *Runer { @@ -90,95 +91,95 @@ func NewRuner() *Runer {
90 return r 91 return r
91 } 92 }
92 93
93 -// func (r *Runer) InitConsumer() error {  
94 -// config := sarama.NewConfig()  
95 -// //config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin  
96 -// config.Consumer.Offsets.Initial = sarama.OffsetOldest  
97 -// config.Version = sarama.V0_10_2_1  
98 -// if err := config.Validate(); err != nil {  
99 -// msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", configs.Cfg, err)  
100 -// logs.Error(msg)  
101 -// panic(msg)  
102 -// }  
103 -  
104 -// consumerGroup, err := sarama.NewConsumerGroup(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, config)  
105 -// if err != nil {  
106 -// return err  
107 -// }  
108 -// r.consumerGroup = consumerGroup  
109 -// return nil  
110 -// }  
111 -  
112 func (r *Runer) InitConsumer() error { 94 func (r *Runer) InitConsumer() error {
113 - clusterCfg := cluster.NewConfig()  
114 - clusterCfg.Consumer.Return.Errors = true  
115 - clusterCfg.Consumer.Offsets.Initial = sarama.OffsetOldest  
116 - clusterCfg.Group.Return.Notifications = true  
117 - clusterCfg.Version = sarama.V0_10_2_1  
118 - // khosts := []string{"192.168.0.252:9092", "192.168.0.251:9092", "192.168.0.250:9092"}  
119 - // groupid := "partnermg_dev"  
120 - // topic := []string{"topic_test"}  
121 - logs.Debug(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, r.msgConsumer.topics)  
122 - consumer, err := cluster.NewConsumer(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, r.msgConsumer.topics, clusterCfg)  
123 - // consumer, err := cluster.NewConsumer(khosts, groupid, topic, clusterCfg)  
124 - if err != nil {  
125 - msg := fmt.Sprintf("Create kafka consumer error: %v. config: %v", err, clusterCfg) 95 + config := sarama.NewConfig()
  96 + //config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
  97 + config.Consumer.Offsets.Initial = sarama.OffsetOldest
  98 + config.Version = sarama.V0_10_2_1
  99 + if err := config.Validate(); err != nil {
  100 + msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", configs.Cfg, err)
126 logs.Error(msg) 101 logs.Error(msg)
127 panic(msg) 102 panic(msg)
128 } 103 }
129 - r.Consumer = consumer 104 +
  105 + consumerGroup, err := sarama.NewConsumerGroup(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, config)
  106 + if err != nil {
  107 + return err
  108 + }
  109 + r.consumerGroup = consumerGroup
130 return nil 110 return nil
131 } 111 }
132 112
133 -// func (r *Runer) Start(ctx context.Context) {  
134 -// defer func() {  
135 -// if e := recover(); e != nil {  
136 -// logs.Error(e)  
137 -// }  
138 -// }()  
139 -// for {  
140 -// select {  
141 -// case <-ctx.Done():  
142 -// logs.Warning("ctx cancel;consumerGroup.Close()")  
143 -// r.consumerGroup.Close()  
144 -// return  
145 -// default:  
146 -// if err := r.consumerGroup.Consume(ctx, r.msgConsumer.topics, r.msgConsumer); err != nil {  
147 -// logs.Error("consumerGroup err:%s \n", err)  
148 -// //等待重试  
149 -// timer := time.NewTimer(5 * time.Second)  
150 -// <-timer.C  
151 -// }  
152 -// r.msgConsumer.ready = make(chan struct{})  
153 -// }  
154 - 113 +// func (r *Runer) InitConsumer() error {
  114 +// clusterCfg := cluster.NewConfig()
  115 +// clusterCfg.Consumer.Return.Errors = true
  116 +// clusterCfg.Consumer.Offsets.Initial = sarama.OffsetOldest
  117 +// clusterCfg.Group.Return.Notifications = true
  118 +// clusterCfg.Version = sarama.V0_10_2_1
  119 +// // khosts := []string{"192.168.0.252:9092", "192.168.0.251:9092", "192.168.0.250:9092"}
  120 +// // groupid := "partnermg_dev"
  121 +// // topic := []string{"topic_test"}
  122 +// logs.Debug(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, r.msgConsumer.topics)
  123 +// consumer, err := cluster.NewConsumer(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, r.msgConsumer.topics, clusterCfg)
  124 +// // consumer, err := cluster.NewConsumer(khosts, groupid, topic, clusterCfg)
  125 +// if err != nil {
  126 +// msg := fmt.Sprintf("Create kafka consumer error: %v. config: %v", err, clusterCfg)
  127 +// logs.Error(msg)
  128 +// panic(msg)
155 // } 129 // }
  130 +// r.Consumer = consumer
  131 +// return nil
156 // } 132 // }
157 133
158 func (r *Runer) Start(ctx context.Context) { 134 func (r *Runer) Start(ctx context.Context) {
  135 + defer func() {
  136 + if e := recover(); e != nil {
  137 + logs.Error(e)
  138 + }
  139 + }()
159 for { 140 for {
160 select { 141 select {
161 - case msg, more := <-r.Consumer.Messages():  
162 - if more {  
163 - logs.Info("Partition:%d, Offset:%d, Key:%s, Value:%s Timestamp:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp)  
164 - r.Consumer.MarkOffset(msg, "") // mark message as processed  
165 - }  
166 - case err, more := <-r.Consumer.Errors():  
167 - if more {  
168 - logs.Info("Kafka consumer error: %v", err.Error())  
169 - }  
170 - case ntf, more := <-r.Consumer.Notifications():  
171 - if more {  
172 - logs.Info("Kafka consumer rebalance: %v", ntf)  
173 - }  
174 case <-ctx.Done(): 142 case <-ctx.Done():
175 - logs.Info("Stop consumer server...")  
176 - r.Consumer.Close() 143 + logs.Warning("ctx cancel;consumerGroup.Close()")
  144 + r.consumerGroup.Close()
177 return 145 return
  146 + default:
  147 + if err := r.consumerGroup.Consume(ctx, r.msgConsumer.topics, r.msgConsumer); err != nil {
  148 + logs.Error("consumerGroup err:%s \n", err)
  149 + //等待重试
  150 + timer := time.NewTimer(5 * time.Second)
  151 + <-timer.C
178 } 152 }
  153 + r.msgConsumer.ready = make(chan struct{})
  154 + }
  155 +
179 } 156 }
180 } 157 }
181 158
  159 +// func (r *Runer) Start(ctx context.Context) {
  160 +// for {
  161 +// select {
  162 +// case msg, more := <-r.Consumer.Messages():
  163 +// if more {
  164 +// logs.Info("Partition:%d, Offset:%d, Key:%s, Value:%s Timestamp:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp)
  165 +// r.Consumer.MarkOffset(msg, "") // mark message as processed
  166 +// }
  167 +// case err, more := <-r.Consumer.Errors():
  168 +// if more {
  169 +// logs.Info("Kafka consumer error: %v", err.Error())
  170 +// }
  171 +// case ntf, more := <-r.Consumer.Notifications():
  172 +// if more {
  173 +// logs.Info("Kafka consumer rebalance: %v", ntf)
  174 +// }
  175 +// case <-ctx.Done():
  176 +// logs.Info("Stop consumer server...")
  177 +// r.Consumer.Close()
  178 +// return
  179 +// }
  180 +// }
  181 +// }
  182 +
182 func (r *Runer) IsReady() <-chan struct{} { 183 func (r *Runer) IsReady() <-chan struct{} {
183 return r.msgConsumer.ready 184 return r.msgConsumer.ready
184 } 185 }