作者 唐旭辉

.

@@ -4,6 +4,7 @@ import ( @@ -4,6 +4,7 @@ import (
4 "context" 4 "context"
5 "errors" 5 "errors"
6 "fmt" 6 "fmt"
  7 + "time"
7 8
8 "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs" 9 "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs"
9 10
@@ -90,93 +91,94 @@ func NewRuner() *Runer { @@ -90,93 +91,94 @@ func NewRuner() *Runer {
90 return r 91 return r
91 } 92 }
92 93
93 -// func (r *Runer) InitConsumer() error {  
94 -// config := sarama.NewConfig()  
95 -// //config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin  
96 -// config.Consumer.Offsets.Initial = sarama.OffsetOldest  
97 -// config.Version = sarama.V0_10_2_1  
98 -// if err := config.Validate(); err != nil {  
99 -// msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", configs.Cfg, err)  
100 -// logs.Error(msg)  
101 -// panic(msg)  
102 -// }  
103 -  
104 -// consumerGroup, err := sarama.NewConsumerGroup(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, config)  
105 -// if err != nil {  
106 -// return err  
107 -// }  
108 -// r.consumerGroup = consumerGroup  
109 -// return nil  
110 -// }  
111 func (r *Runer) InitConsumer() error { 94 func (r *Runer) InitConsumer() error {
112 - clusterCfg := cluster.NewConfig()  
113 - clusterCfg.Consumer.Return.Errors = true  
114 - clusterCfg.Consumer.Offsets.Initial = sarama.OffsetOldest  
115 - clusterCfg.Group.Return.Notifications = true  
116 - clusterCfg.Version = sarama.V0_10_2_1  
117 - khosts := []string{"192.168.0.252:9092", "192.168.0.251:9092", "192.168.0.250:9092"}  
118 - groupid := "partnermg_dev"  
119 - topic := []string{"topic_test"}  
120 - // consumer, err := cluster.NewConsumer(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, r.msgConsumer.topics, clusterCfg)  
121 - consumer, err := cluster.NewConsumer(khosts, groupid, topic, clusterCfg)  
122 - if err != nil {  
123 - msg := fmt.Sprintf("Create kafka consumer error: %v. config: %v", err, clusterCfg) 95 + config := sarama.NewConfig()
  96 + //config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
  97 + config.Consumer.Offsets.Initial = sarama.OffsetOldest
  98 + config.Version = sarama.V0_10_2_1
  99 + if err := config.Validate(); err != nil {
  100 + msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", configs.Cfg, err)
124 logs.Error(msg) 101 logs.Error(msg)
125 panic(msg) 102 panic(msg)
126 } 103 }
127 - r.Consumer = consumer 104 +
  105 + consumerGroup, err := sarama.NewConsumerGroup(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, config)
  106 + if err != nil {
  107 + return err
  108 + }
  109 + r.consumerGroup = consumerGroup
128 return nil 110 return nil
129 } 111 }
130 112
131 -// func (r *Runer) Start(ctx context.Context) {  
132 -// defer func() {  
133 -// if e := recover(); e != nil {  
134 -// logs.Error(e)  
135 -// }  
136 -// }()  
137 -// for {  
138 -// select {  
139 -// case <-ctx.Done():  
140 -// logs.Warning("ctx cancel;consumerGroup.Close()")  
141 -// r.consumerGroup.Close()  
142 -// return  
143 -// default:  
144 -// if err := r.consumerGroup.Consume(ctx, r.msgConsumer.topics, r.msgConsumer); err != nil {  
145 -// logs.Error("consumerGroup err:%s \n", err)  
146 -// //等待重试  
147 -// timer := time.NewTimer(5 * time.Second)  
148 -// <-timer.C  
149 -// }  
150 -// r.msgConsumer.ready = make(chan struct{})  
151 -// }  
152 - 113 +// func (r *Runer) InitConsumer() error {
  114 +// clusterCfg := cluster.NewConfig()
  115 +// clusterCfg.Consumer.Return.Errors = true
  116 +// clusterCfg.Consumer.Offsets.Initial = sarama.OffsetOldest
  117 +// clusterCfg.Group.Return.Notifications = true
  118 +// clusterCfg.Version = sarama.V0_10_2_1
  119 +// khosts := []string{"192.168.0.252:9092", "192.168.0.251:9092", "192.168.0.250:9092"}
  120 +// groupid := "partnermg_dev"
  121 +// topic := []string{"topic_test"}
  122 +// // consumer, err := cluster.NewConsumer(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, r.msgConsumer.topics, clusterCfg)
  123 +// consumer, err := cluster.NewConsumer(khosts, groupid, topic, clusterCfg)
  124 +// if err != nil {
  125 +// msg := fmt.Sprintf("Create kafka consumer error: %v. config: %v", err, clusterCfg)
  126 +// logs.Error(msg)
  127 +// panic(msg)
153 // } 128 // }
  129 +// r.Consumer = consumer
  130 +// return nil
154 // } 131 // }
155 132
156 func (r *Runer) Start(ctx context.Context) { 133 func (r *Runer) Start(ctx context.Context) {
  134 + defer func() {
  135 + if e := recover(); e != nil {
  136 + logs.Error(e)
  137 + }
  138 + }()
157 for { 139 for {
158 select { 140 select {
159 - case msg, more := <-r.Consumer.Messages():  
160 - if more {  
161 - logs.Info("Partition:%d, Offset:%d, Key:%s, Value:%s Timestamp:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp)  
162 - r.Consumer.MarkOffset(msg, "") // mark message as processed  
163 - }  
164 - case err, more := <-r.Consumer.Errors():  
165 - if more {  
166 - logs.Info("Kafka consumer error: %v", err.Error())  
167 - }  
168 - case ntf, more := <-r.Consumer.Notifications():  
169 - if more {  
170 - logs.Info("Kafka consumer rebalance: %v", ntf)  
171 - }  
172 case <-ctx.Done(): 141 case <-ctx.Done():
173 - logs.Info("Stop consumer server...")  
174 - r.Consumer.Close() 142 + logs.Warning("ctx cancel;consumerGroup.Close()")
  143 + r.consumerGroup.Close()
175 return 144 return
  145 + default:
  146 + if err := r.consumerGroup.Consume(ctx, r.msgConsumer.topics, r.msgConsumer); err != nil {
  147 + logs.Error("consumerGroup err:%s \n", err)
  148 + //等待重试
  149 + timer := time.NewTimer(5 * time.Second)
  150 + <-timer.C
  151 + }
  152 + r.msgConsumer.ready = make(chan struct{})
176 } 153 }
  154 +
177 } 155 }
178 } 156 }
179 157
  158 +// func (r *Runer) Start(ctx context.Context) {
  159 +// for {
  160 +// select {
  161 +// case msg, more := <-r.Consumer.Messages():
  162 +// if more {
  163 +// logs.Info("Partition:%d, Offset:%d, Key:%s, Value:%s Timestamp:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp)
  164 +// r.Consumer.MarkOffset(msg, "") // mark message as processed
  165 +// }
  166 +// case err, more := <-r.Consumer.Errors():
  167 +// if more {
  168 +// logs.Info("Kafka consumer error: %v", err.Error())
  169 +// }
  170 +// case ntf, more := <-r.Consumer.Notifications():
  171 +// if more {
  172 +// logs.Info("Kafka consumer rebalance: %v", ntf)
  173 +// }
  174 +// case <-ctx.Done():
  175 +// logs.Info("Stop consumer server...")
  176 +// r.Consumer.Close()
  177 +// return
  178 +// }
  179 +// }
  180 +// }
  181 +
180 func (r *Runer) IsReady() <-chan struct{} { 182 func (r *Runer) IsReady() <-chan struct{} {
181 return r.msgConsumer.ready 183 return r.msgConsumer.ready
182 } 184 }