|
@@ -41,6 +41,7 @@ func (c *MessageConsumer) ConsumeClaim(groupSession sarama.ConsumerGroupSession, |
|
@@ -41,6 +41,7 @@ func (c *MessageConsumer) ConsumeClaim(groupSession sarama.ConsumerGroupSession, |
41
|
err error
|
41
|
err error
|
42
|
)
|
42
|
)
|
43
|
for message := range groupClaim.Messages() {
|
43
|
for message := range groupClaim.Messages() {
|
|
|
44
|
+
|
44
|
logs.Debug("Done Message claimed: timestamp = %v, topic = %s offset = %v value = %v \n",
|
45
|
logs.Debug("Done Message claimed: timestamp = %v, topic = %s offset = %v value = %v \n",
|
45
|
message.Timestamp, message.Topic, message.Offset, string(message.Value))
|
46
|
message.Timestamp, message.Topic, message.Offset, string(message.Value))
|
46
|
for i := range c.beforeHandles {
|
47
|
for i := range c.beforeHandles {
|
|
@@ -118,7 +119,7 @@ func (r *Runer) Start(ctx context.Context) { |
|
@@ -118,7 +119,7 @@ func (r *Runer) Start(ctx context.Context) { |
118
|
r.consumerGroup.Close()
|
119
|
r.consumerGroup.Close()
|
119
|
return
|
120
|
return
|
120
|
default:
|
121
|
default:
|
121
|
-
|
122
|
+
|
122
|
}
|
123
|
}
|
123
|
if err := r.consumerGroup.Consume(ctx, r.msgConsumer.topics, r.msgConsumer); err != nil {
|
124
|
if err := r.consumerGroup.Consume(ctx, r.msgConsumer.topics, r.msgConsumer); err != nil {
|
124
|
logs.Error("consumerGroup err:%s \n", err)
|
125
|
logs.Error("consumerGroup err:%s \n", err)
|