作者 唐旭辉

.

@@ -4,7 +4,6 @@ import ( @@ -4,7 +4,6 @@ import (
4 "context" 4 "context"
5 "errors" 5 "errors"
6 "fmt" 6 "fmt"
7 - "time"  
8 7
9 "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs" 8 "gitlab.fjmaimaimai.com/mmm-go/partnermg/pkg/port/consumer/configs"
10 9
@@ -91,94 +90,94 @@ func NewRuner() *Runer { @@ -91,94 +90,94 @@ func NewRuner() *Runer {
91 return r 90 return r
92 } 91 }
93 92
94 -func (r *Runer) InitConsumer() error {  
95 - config := sarama.NewConfig()  
96 - //config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin  
97 - config.Consumer.Offsets.Initial = sarama.OffsetOldest  
98 - config.Version = sarama.V0_10_2_1  
99 - if err := config.Validate(); err != nil {  
100 - msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", configs.Cfg, err)  
101 - logs.Error(msg)  
102 - panic(msg)  
103 - }  
104 -  
105 - consumerGroup, err := sarama.NewConsumerGroup(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, config)  
106 - if err != nil {  
107 - return err  
108 - }  
109 - r.consumerGroup = consumerGroup  
110 - return nil  
111 -}  
112 -  
113 // func (r *Runer) InitConsumer() error { 93 // func (r *Runer) InitConsumer() error {
114 -// clusterCfg := cluster.NewConfig()  
115 -// clusterCfg.Consumer.Return.Errors = true  
116 -// clusterCfg.Consumer.Offsets.Initial = sarama.OffsetOldest  
117 -// clusterCfg.Group.Return.Notifications = true  
118 -// clusterCfg.Version = sarama.V0_10_2_1  
119 -// khosts := []string{"192.168.0.252:9092", "192.168.0.251:9092", "192.168.0.250:9092"}  
120 -// groupid := "partnermg_dev"  
121 -// topic := []string{"topic_test"}  
122 -// // consumer, err := cluster.NewConsumer(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, r.msgConsumer.topics, clusterCfg)  
123 -// consumer, err := cluster.NewConsumer(khosts, groupid, topic, clusterCfg)  
124 -// if err != nil {  
125 -// msg := fmt.Sprintf("Create kafka consumer error: %v. config: %v", err, clusterCfg) 94 +// config := sarama.NewConfig()
  95 +// //config.Consumer.Group.Rebalance.Strategy = sarama.BalanceStrategyRoundRobin
  96 +// config.Consumer.Offsets.Initial = sarama.OffsetOldest
  97 +// config.Version = sarama.V0_10_2_1
  98 +// if err := config.Validate(); err != nil {
  99 +// msg := fmt.Sprintf("Kafka producer config invalidate. config: %v. err: %v", configs.Cfg, err)
126 // logs.Error(msg) 100 // logs.Error(msg)
127 // panic(msg) 101 // panic(msg)
128 // } 102 // }
129 -// r.Consumer = consumer 103 +
  104 +// consumerGroup, err := sarama.NewConsumerGroup(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, config)
  105 +// if err != nil {
  106 +// return err
  107 +// }
  108 +// r.consumerGroup = consumerGroup
130 // return nil 109 // return nil
131 // } 110 // }
132 111
133 -func (r *Runer) Start(ctx context.Context) {  
134 - defer func() {  
135 - if e := recover(); e != nil {  
136 - logs.Error(e)  
137 - }  
138 - }()  
139 - for {  
140 - select {  
141 - case <-ctx.Done():  
142 - logs.Warning("ctx cancel;consumerGroup.Close()")  
143 - r.consumerGroup.Close()  
144 - return  
145 - default:  
146 - if err := r.consumerGroup.Consume(ctx, r.msgConsumer.topics, r.msgConsumer); err != nil {  
147 - logs.Error("consumerGroup err:%s \n", err)  
148 - //等待重试  
149 - timer := time.NewTimer(5 * time.Second)  
150 - <-timer.C  
151 - }  
152 - r.msgConsumer.ready = make(chan struct{})  
153 - }  
154 - 112 +func (r *Runer) InitConsumer() error {
  113 + clusterCfg := cluster.NewConfig()
  114 + clusterCfg.Consumer.Return.Errors = true
  115 + clusterCfg.Consumer.Offsets.Initial = sarama.OffsetOldest
  116 + clusterCfg.Group.Return.Notifications = true
  117 + clusterCfg.Version = sarama.V0_10_2_1
  118 + // khosts := []string{"192.168.0.252:9092", "192.168.0.251:9092", "192.168.0.250:9092"}
  119 + // groupid := "partnermg_dev"
  120 + // topic := []string{"topic_test"}
  121 + consumer, err := cluster.NewConsumer(r.msgConsumer.kafkaHosts, r.msgConsumer.groupId, r.msgConsumer.topics, clusterCfg)
  122 + // consumer, err := cluster.NewConsumer(khosts, groupid, topic, clusterCfg)
  123 + if err != nil {
  124 + msg := fmt.Sprintf("Create kafka consumer error: %v. config: %v", err, clusterCfg)
  125 + logs.Error(msg)
  126 + panic(msg)
155 } 127 }
  128 + r.Consumer = consumer
  129 + return nil
156 } 130 }
157 131
158 // func (r *Runer) Start(ctx context.Context) { 132 // func (r *Runer) Start(ctx context.Context) {
  133 +// defer func() {
  134 +// if e := recover(); e != nil {
  135 +// logs.Error(e)
  136 +// }
  137 +// }()
159 // for { 138 // for {
160 // select { 139 // select {
161 -// case msg, more := <-r.Consumer.Messages():  
162 -// if more {  
163 -// logs.Info("Partition:%d, Offset:%d, Key:%s, Value:%s Timestamp:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp)  
164 -// r.Consumer.MarkOffset(msg, "") // mark message as processed  
165 -// }  
166 -// case err, more := <-r.Consumer.Errors():  
167 -// if more {  
168 -// logs.Info("Kafka consumer error: %v", err.Error())  
169 -// }  
170 -// case ntf, more := <-r.Consumer.Notifications():  
171 -// if more {  
172 -// logs.Info("Kafka consumer rebalance: %v", ntf)  
173 -// }  
174 // case <-ctx.Done(): 140 // case <-ctx.Done():
175 -// logs.Info("Stop consumer server...")  
176 -// r.Consumer.Close() 141 +// logs.Warning("ctx cancel;consumerGroup.Close()")
  142 +// r.consumerGroup.Close()
177 // return 143 // return
  144 +// default:
  145 +// if err := r.consumerGroup.Consume(ctx, r.msgConsumer.topics, r.msgConsumer); err != nil {
  146 +// logs.Error("consumerGroup err:%s \n", err)
  147 +// //等待重试
  148 +// timer := time.NewTimer(5 * time.Second)
  149 +// <-timer.C
  150 +// }
  151 +// r.msgConsumer.ready = make(chan struct{})
178 // } 152 // }
  153 +
179 // } 154 // }
180 // } 155 // }
181 156
  157 +func (r *Runer) Start(ctx context.Context) {
  158 + for {
  159 + select {
  160 + case msg, more := <-r.Consumer.Messages():
  161 + if more {
  162 + logs.Info("Partition:%d, Offset:%d, Key:%s, Value:%s Timestamp:%s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value), msg.Timestamp)
  163 + r.Consumer.MarkOffset(msg, "") // mark message as processed
  164 + }
  165 + case err, more := <-r.Consumer.Errors():
  166 + if more {
  167 + logs.Info("Kafka consumer error: %v", err.Error())
  168 + }
  169 + case ntf, more := <-r.Consumer.Notifications():
  170 + if more {
  171 + logs.Info("Kafka consumer rebalance: %v", ntf)
  172 + }
  173 + case <-ctx.Done():
  174 + logs.Info("Stop consumer server...")
  175 + r.Consumer.Close()
  176 + return
  177 + }
  178 + }
  179 +}
  180 +
182 func (r *Runer) IsReady() <-chan struct{} { 181 func (r *Runer) IsReady() <-chan struct{} {
183 return r.msgConsumer.ready 182 return r.msgConsumer.ready
184 } 183 }